From 8c56ffca1940a5e99240f4af82781f65f13e4b4d Mon Sep 17 00:00:00 2001 From: Quentin De Coninck Date: Thu, 14 Apr 2016 15:41:50 +0200 Subject: [PATCH] mpExperienceSiri With Java client and Python server --- src/SiriClient.java | 365 ++++++++++++++++++++++++++++++++++++++++ src/mpExperienceSiri.py | 81 +++++++++ src/mpParamXp.py | 18 ++ src/siriClient.jar | Bin 0 -> 14079 bytes src/siri_server.py | 153 +++++++++++++++++ 5 files changed, 617 insertions(+) create mode 100644 src/SiriClient.java create mode 100644 src/mpExperienceSiri.py create mode 100644 src/siriClient.jar create mode 100644 src/siri_server.py diff --git a/src/SiriClient.java b/src/SiriClient.java new file mode 100644 index 0000000..366cf1d --- /dev/null +++ b/src/SiriClient.java @@ -0,0 +1,365 @@ +import java.io.*; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.Socket; +import java.net.SocketException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.Collections; + +public class SiriClient { + + private String serverMessage; + + /* Client parameters */ + public final String SERVERIP; //your computer IP address + public final int SERVERPORT; + public final int RUN_TIME; // In seconds + public final int QUERY_SIZE; + public final int RESPONSE_SIZE; + public final int DELAY_QUERY_RESPONSE; + public final int MIN_PAYLOAD_SIZE; + public final int MAX_PAYLOAD_SIZE; + public final int INTERVAL_TIME_MS; + public final int BUFFER_SIZE; + + private boolean mRun = false; + private int messageId = 0; + private static final int MAX_ID = 100; + + private Random random; + private long[] sentTime; + private List delayTime; + private String mac; + private int counter; + private int missed; + PrintWriter out; + BufferedReader in; + OutputStream outputStream; + OutputStreamWriter osw; + Socket socket; + + /** + * Constructor of the class. OnMessagedReceived listens for the messages received from server + */ + public SiriClient(String serverIp, int serverPort, int runTime, int querySize, int responseSize, + int delayQueryResponse, int minPayloadSize, int maxPayloadSize, int intervalTimeMs, + int bufferSize) { + random = new Random(); + sentTime = new long[MAX_ID]; + delayTime = new ArrayList<>(); + counter = 0; + missed = 0; + /* Client parameters */ + SERVERIP = serverIp; + SERVERPORT = serverPort; + RUN_TIME = runTime; + QUERY_SIZE = querySize; + RESPONSE_SIZE = responseSize; + DELAY_QUERY_RESPONSE = delayQueryResponse; + MIN_PAYLOAD_SIZE = minPayloadSize; + MAX_PAYLOAD_SIZE = maxPayloadSize; + INTERVAL_TIME_MS = intervalTimeMs; + BUFFER_SIZE = bufferSize; + } + + public SiriClient(String serverIp, int serverPort, int runTime) { + this(serverIp, serverPort, runTime, 2500, 750, 0, 85, 500, 333, 9); + } + + protected String getStringWithLengthAndFilledWithCharacter(int length, char charToFill) { + if (length > 0) { + char[] array = new char[length]; + Arrays.fill(array, charToFill); + return new String(array); + } + return ""; + } + + /** + * Return a random number in [@MIN_PAYLOAD_SIZE, @MAX_PAYLOAD_SIZE] + * It manages 3 cases: + * 1) remainToSend in [@MIN_PAYLOAD_SIZE, @MAX_PAYLOAD_SIZE]: return remainToSend + * 2) remainToSend - @MAX_PAYLOAD_SIZE < MIN_PAYLOAD_SIZE: return random value in + * [@MIN_PAYLOAD_SIZE, @MAX_PAYLOAD_SIZE - @MIN_PAYLOAD_SIZE] + * 3) else, return random value in [@MIN_PAYLOAD_SIZE, @MAX_PAYLOAD_SIZE] + * @param remainToSend number of remaining bytes to send >= MIN_PAYLOAD_SIZE + */ + private int sizeOfNextPacket(int remainToSend) { + if (remainToSend < MIN_PAYLOAD_SIZE) throw new AssertionError(); + + // Case 1) + if (remainToSend <= MAX_PAYLOAD_SIZE && remainToSend >= MIN_PAYLOAD_SIZE) { + return remainToSend; + } + + int randomPart; + + // Case 2) + if (remainToSend - MAX_PAYLOAD_SIZE < MIN_PAYLOAD_SIZE) { + randomPart = random.nextInt(MAX_PAYLOAD_SIZE - 2 * MIN_PAYLOAD_SIZE + 1); + } + // Case 3) + else { + randomPart = random.nextInt(MAX_PAYLOAD_SIZE - MIN_PAYLOAD_SIZE + 1); + } + + return MIN_PAYLOAD_SIZE + randomPart; + } + + /** + * Get a random value following a Poisson distribution of mean lambda + * @param lambda mean of the Poisson distribution + * @return random value following a Poisson distribution of mean lambda + */ + public static int getPoisson(double lambda) { + double L = Math.exp(-lambda); + double p = 1.0; + int k = 0; + + do { + k++; + p *= Math.random(); + } while (p > L); + + return k - 1; + } + + /** + * Sends the message entered by client to the server + */ + public void sendMessage() { + // if (out != null && !out.checkError() && osw != null) { + if (socket != null && !socket.isClosed()) { + int remainToBeSent = QUERY_SIZE; + // If the server has a DB, use it, otherwise set to 0 + //int delaysToSend = delayTime.size(); + int delaysToSend = 0; + StringBuffer sb = new StringBuffer(); +// for (int i = 0; i < delaysToSend; i++) { +// sb.append(delayTime.get(0) + "&"); +// delayTime.remove(delayTime.get(0)); +// } + sentTime[messageId] = System.currentTimeMillis(); + String startString = messageId + "&" + QUERY_SIZE + "&" + RESPONSE_SIZE + "&" + + DELAY_QUERY_RESPONSE + "&" + delaysToSend + "&" + sentTime[messageId] + + "&" + mac + "&" + sb.toString(); + messageId = (messageId + 1) % MAX_ID; + int bytesToSend = Math.max(startString.length(), sizeOfNextPacket(remainToBeSent)); + // System.err.println("BytesToSend: " + bytesToSend); + byte[] toSend; + try { + //osw.write(startString + getStringWithLengthAndFilledWithCharacter(bytesToSend - startString.length(), '0')); + //osw.flush(); + toSend = (startString + getStringWithLengthAndFilledWithCharacter(bytesToSend - startString.length(), '0')).getBytes(StandardCharsets.US_ASCII); + outputStream.write(toSend); + outputStream.flush(); + } catch (IOException e) { + System.err.println("ERROR: OUTPUT STREAM ERROR"); + } + // out.println(startString + getStringWithLengthAndFilledWithCharacter(bytesToSend - startString.length() - 1, '0')); + // System.err.println("Sent " + startString + getStringWithLengthAndFilledWithCharacter(bytesToSend - startString.length() - 1, '0')); + // out.flush(); + remainToBeSent -= bytesToSend; + synchronized (this) { + counter += bytesToSend; + } + + try { + while(remainToBeSent > 0) { + bytesToSend = sizeOfNextPacket(remainToBeSent); + //out.println(getStringWithLengthAndFilledWithCharacter(bytesToSend - 1, '0')); + //out.flush(); + + if (remainToBeSent == bytesToSend) { + //osw.write(getStringWithLengthAndFilledWithCharacter(bytesToSend - 1, '0') + "\n"); + toSend = (getStringWithLengthAndFilledWithCharacter(bytesToSend - 1, '0') + "\n").getBytes(StandardCharsets.US_ASCII); + } else { + //osw.write(getStringWithLengthAndFilledWithCharacter(bytesToSend, '0')); + toSend = getStringWithLengthAndFilledWithCharacter(bytesToSend, '0').getBytes(StandardCharsets.US_ASCII); + } + //osw.flush(); + outputStream.write(toSend); + outputStream.flush(); + + remainToBeSent -= bytesToSend; + synchronized (this) { + counter += bytesToSend; + } + } + } catch (IOException e) { + System.err.println("ERROR: OUTPUT STREAM ERROR"); + e.printStackTrace(); + } + } + } + + public void stopClient(){ + mRun = false; + if (socket != null) { + try { + socket.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + public static String getIPAddress(boolean useIPv4) { + try { + List interfaces = Collections.list(NetworkInterface.getNetworkInterfaces()); + for (NetworkInterface intf : interfaces) { + List addrs = Collections.list(intf.getInetAddresses()); + for (InetAddress addr : addrs) { + if (!addr.isLoopbackAddress()) { + String sAddr = addr.getHostAddress(); + //boolean isIPv4 = InetAddressUtils.isIPv4Address(sAddr); + boolean isIPv4 = sAddr.indexOf(':')<0; + + if (useIPv4) { + if (isIPv4) + return sAddr; + } else { + if (!isIPv4) { + int delim = sAddr.indexOf('%'); // drop ip6 zone suffix + return delim<0 ? sAddr.toUpperCase() : sAddr.substring(0, delim).toUpperCase(); + } + } + } + } + } + } catch (Exception ex) { } // for now eat exceptions + return ""; + } + + public void run(String macWifi) { + + mRun = true; + mac = macWifi; + + try { + //here you must put your computer's IP address. + InetAddress serverAddr = InetAddress.getByName(SERVERIP); + System.err.println("Me: " + getIPAddress(true)); + + System.err.println("TCP Client: Connecting..."); + + //create a socket to make the connection with the server + socket = new Socket(serverAddr, SERVERPORT); + // Needed to emulate any traffic + socket.setTcpNoDelay(true); + + new Thread(new Runnable() { + public void run() { + final long startTime = System.currentTimeMillis(); + while (socket != null && !socket.isClosed()) { + try { + Thread.sleep(INTERVAL_TIME_MS); //* getPoisson(3)); + if ((System.currentTimeMillis() - startTime) >= RUN_TIME * 1000) { + stopClient(); + } else if (!socket.isClosed() && counter <= QUERY_SIZE * BUFFER_SIZE) { + sendMessage(); + } else if (!socket.isClosed()) { + missed++; + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + + } + }).start(); + + + try { + + //send the message to the server + out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())), true); + outputStream = socket.getOutputStream(); + osw = new OutputStreamWriter(socket.getOutputStream()); + + System.err.println("TCP Client: Done."); + + //receive the message which the server sends back + in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + + //in this while the client listens for the messages sent by the server + while (mRun) { + serverMessage = in.readLine(); + long receivedTime = System.currentTimeMillis(); + // System.err.println("SERVER MESSAGE: " + ((serverMessage == null) ? "" : serverMessage)); + if (serverMessage != null) { + int id = Integer.parseInt(serverMessage.split("&")[0]); + // System.err.println("ELAPSED TIME: " + (receivedTime - sentTime[id])); + delayTime.add(receivedTime - sentTime[id]); + synchronized (this) { + counter -= QUERY_SIZE; + } + + } + serverMessage = null; + + } + + //System.err.println("RESPONSE FROM SERVER: Received Message: '" + serverMessage + "'"); + + } catch (SocketException e) { + System.err.println("TCP: Socket closed"); + } catch (Exception e) { + System.err.println("TCP: Error " + e); + + } finally { + //the socket must be closed. It is not possible to reconnect to this socket + // after it is closed, which means a new socket instance has to be created. + socket.close(); + } + + } catch (Exception e) { + + System.err.println("TCP C: Error" + e); + + } + + } + + public static void usage() { + System.out.println("Usage: siriClient serverIP serverPort runTime [querySize responseSize delayQueryResponse " + + "minPayloadSize maxPayloadSize intervalTimeMs bufferSize]"); + } + + public static void main(String[] args) { + if (args.length != 3 && args.length != 10) { + usage(); + System.exit(1); + } + String serverIp = args[0]; + int serverPort = Integer.parseInt(args[1]); + int runTime = Integer.parseInt(args[2]); + SiriClient siriClient; + + if (args.length == 10) { + int querySize = Integer.parseInt(args[3]); + int responseSize = Integer.parseInt(args[4]); + int delayQueryResponse = Integer.parseInt(args[5]); + int minPayloadSize = Integer.parseInt(args[6]); + int maxPayloadSize = Integer.parseInt(args[7]); + int intervalTimeMs = Integer.parseInt(args[8]); + int bufferSize = Integer.parseInt(args[9]); + siriClient = new SiriClient(serverIp, serverPort, runTime, querySize, responseSize, delayQueryResponse, + minPayloadSize, maxPayloadSize, intervalTimeMs, bufferSize); + } else { + siriClient = new SiriClient(serverIp, serverPort, runTime); + } + + String mac = "00:00:00:00:00:00"; + siriClient.run(mac); + System.out.println("missed: " + siriClient.missed); + for (long delay: siriClient.delayTime) { + System.out.println(delay); + } + } +} \ No newline at end of file diff --git a/src/mpExperienceSiri.py b/src/mpExperienceSiri.py new file mode 100644 index 0000000..e12acbb --- /dev/null +++ b/src/mpExperienceSiri.py @@ -0,0 +1,81 @@ +from mpExperience import MpExperience +from mpParamXp import MpParamXp +from mpPvAt import MpPvAt +import os + +class MpExperienceSiri(MpExperience): + SERVER_LOG = "siri_server.log" + CLIENT_LOG = "siri_client.log" + JAVA_BIN = "java" + PING_OUTPUT = "ping.log" + + def __init__(self, xpParamFile, mpTopo, mpConfig): + MpExperience.__init__(self, xpParamFile, mpTopo, mpConfig) + self.loadParam() + self.ping() + MpExperience.classicRun(self) + + def ping(self): + self.mpTopo.commandTo(self.mpConfig.client, "rm " + \ + MpExperienceSiri.PING_OUTPUT ) + count = self.xpParam.getParam(MpParamXp.PINGCOUNT) + for i in range(0, self.mpConfig.getClientInterfaceCount()): + cmd = self.pingCommand(self.mpConfig.getClientIP(i), + self.mpConfig.getServerIP(), n = count) + self.mpTopo.commandTo(self.mpConfig.client, cmd) + + def pingCommand(self, fromIP, toIP, n=5): + s = "ping -c " + str(n) + " -I " + fromIP + " " + toIP + \ + " >> " + MpExperienceSiri.PING_OUTPUT + print(s) + return s + + def loadParam(self): + """ + todo : param LD_PRELOAD ?? + """ + self.run_time = self.xpParam.getParam(MpParamXp.SIRIRUNTIME) + self.query_size = self.xpParam.getParam(MpParamXp.SIRIQUERYSIZE) + self.response_size = self.xpParam.getParam(MpParamXp.SIRIRESPONSESIZE) + self.delay_query_response = self.xpParam.getParam(MpParamXp.SIRIDELAYQUERYRESPONSE) + self.min_payload_size = self.xpParam.getParam(MpParamXp.SIRIMINPAYLOADSIZE) + self.max_payload_size = self.xpParam.getParam(MpParamXp.SIRIMAXPAYLOADSIZE) + self.interval_time_ms = self.xpParam.getParam(MpParamXp.SIRIINTERVALTIMEMS) + self.buffer_size = self.xpParam.getParam(MpParamXp.SIRIBUFFERSIZE) + # Little trick here + self.xpParam.defaultValue[MpParamXp.AUTOCORK] = "0" + + def prepare(self): + MpExperience.prepare(self) + self.mpTopo.commandTo(self.mpConfig.client, "rm " + \ + MpExperienceSiri.CLIENT_LOG) + self.mpTopo.commandTo(self.mpConfig.server, "rm " + \ + MpExperienceSiri.SERVER_LOG) + + def getSiriServerCmd(self): + s = "python " + os.path.dirname(os.path.abspath(__file__)) + \ + "/siri_server.py &>" + MpExperienceSiri.SERVER_LOG + "&" + print(s) + return s + + def getSiriClientCmd(self): + s = MpExperienceSiri.JAVA_BIN + " -jar siriClient.jar" + self.mpConfig.getServerIP() + \ + " 8080 " + self.run_time + " " + self.query_size + " " + self.response_size + \ + " " + self.delay_query_response + " " + self.min_payload_size + " " + \ + self.max_payload_size + " " + self.interval_time_ms + " " + self.buffer_size + \ + " &>" + MpExperienceSiri.CLIENT_LOG + print(s) + return s + + def clean(self): + MpExperience.clean(self) + + + def run(self): + cmd = self.getSiriServerCmd() + self.mpTopo.commandTo(self.mpConfig.server, cmd) + + self.mpTopo.commandTo(self.mpConfig.client, "sleep 2") + cmd = self.getSiriClientCmd() + self.mpTopo.commandTo(self.mpConfig.client, cmd) + self.mpTopo.commandTo(self.mpConfig.client, "sleep 2") diff --git a/src/mpParamXp.py b/src/mpParamXp.py index a20c47f..6bf2804 100644 --- a/src/mpParamXp.py +++ b/src/mpParamXp.py @@ -6,6 +6,7 @@ class MpParamXp(MpParam): WMEM = "wmem" SCHED = "sched" CC = "congctrl" + AUTOCORK = "autocork" KERNELPM = "kpm" KERNELPMC = "kpmc" #kernel path manager client / server KERNELPMS = "kpms" @@ -38,6 +39,14 @@ class MpParamXp(MpParam): NETPERFREQRESSIZE = "netperfReqresSize" ABCONCURRENTREQUESTS = "abConccurentRequests" ABTIMELIMIT = "abTimelimit" + SIRIRUNTIME = "siriRunTime" + SIRIQUERYSIZE = "siriQuerySize" + SIRIRESPONSESIZE = "siriResponseSize" + SIRIDELAYQUERYRESPONSE = "siriDelayQueryResponse" + SIRIMINPAYLOADSIZE = "siriMinPayloadSize" + SIRIMAXPAYLOADSIZE = "siriMaxPayloadSize" + SIRIINTERVALTIMEMS = "siriIntervalTimeMs" + SIRIBUFFERSIZE = "siriBufferSize" # global sysctl @@ -47,6 +56,7 @@ class MpParamXp(MpParam): sysctlKey[KERNELPM] = "net.mptcp.mptcp_path_manager" sysctlKey[SCHED] = "net.mptcp.mptcp_scheduler" sysctlKey[CC] = "net.ipv4.tcp_congestion_control" + sysctlKey[AUTOCORK] = "net.ipv4.tcp_autocorking" sysctlKeyClient = {} sysctlKeyClient[KERNELPMC] = "net.mptcp.mptcp_path_manager" @@ -66,6 +76,7 @@ class MpParamXp(MpParam): defaultValue[USERPMSARGS] = "" defaultValue[CC] = "olia" defaultValue[SCHED] = "default" + defaultValue[AUTOCORK] = "1" defaultValue[CLIENTPCAP] = "no" defaultValue[SERVERPCAP] = "no" @@ -91,6 +102,13 @@ class MpParamXp(MpParam): defaultValue[NETPERFREQRESSIZE] = "2K,256" defaultValue[ABCONCURRENTREQUESTS] = "50" defaultValue[ABTIMELIMIT] = "20" + defaultValue[SIRIQUERYSIZE] = "2500" + defaultValue[SIRIRESPONSESIZE] = "750" + defaultValue[SIRIDELAYQUERYRESPONSE] = "0" + defaultValue[SIRIMINPAYLOADSIZE] = "85" + defaultValue[SIRIMAXPAYLOADSIZE] = "500" + defaultValue[SIRIINTERVALTIMEMS] = "333" + defaultValue[SIRIBUFFERSIZE] = "9" def __init__(self, paramFile): MpParam.__init__(self, paramFile) diff --git a/src/siriClient.jar b/src/siriClient.jar new file mode 100644 index 0000000000000000000000000000000000000000..6c8009b5a9eb11159b17e57a1cf53d1702ad9fbd GIT binary patch literal 14079 zcmbt*19+y(vTkhKwr$%^Cbn%SlVoDsp4j%pwllG9bCNrA);?#}UU%VP_JpDg^ zS64q@)%$(jU2iE!1B0LdKtMnMQ0qiX0Q^Z%06+j_MO6f8CFR8EWd-FV#YB}==w!v- zM*#p-Bio^X1Yty9{onfDx z4sdM=vw9DiC`1#9@3kgc{O^YJ%t&fu)Ne`XzCdY(+`Y1Q5lvA&>T9P&8)mIbIFO=y zMYCyscaVR##M=OYoCQ{p28U=cz%zCFImN7>ABCS2hXMeA0QkQq4)XI~J4ZA6f8PV< zA3aQrtS#)FO#Y@P!e4q?8aw|@Z_Izu+rrk_#L?Ek`ft=I{!Y!(z|q3?=a;pefw76> ze~j|4{l)d)7)44_O4!cU$=Sfx*@@1`+Q7*vD}L2>P5>eJ!$!_F{SE{c6sn;?Kuba! zPDoWyB6+tV8i%_s%ih8UiP~jvW!Tl7zACn53(%dSpF=k2OKfb*^17YtLC*5>^Y!hj z+n&PEDN-`VM&OLPb4ceSrR%^51>!dXhp&FGJAvZ50ulH(aoAxtjkF!x;FHugvM^L@pY?mgl`eE^d(rr{m~PThmEie}?I)6d^D z3^nuoSiz&gK8c639U9=coB%M5q4xDBC*o=jp_{#32xG>uM zXoF2oRy0IVGa2X6@@WdG)8cJXftoOrA4G!JKTB_zQH<|5y`Ru-5hkUzF;q-yW=l2< zbyG}*uQAJtHsgAB`T>t_5Lh*uHOt|OrbHE$;pMsKr&{Sv?0ZrBhW8;%KvycX|?D0EMl=&i=g zyEJk-T-N80a%Mi@+id+ccrx>qZQgkOgD0@sP z+0y>~qfY8O+DkDa8#Hj9lh_cJ926&03dv!M+sh)!d5?2`cP0LyhXf{J1LLg#vEHqw z-8i`Zv<5Zbw+;EBzAe^7q!l)XuRUGRR?|;puEaykkcOE9x}o?{~Td zbp@t4zqJtNt*eSI_#a6FX zhPW5xzr6ZRccZHh2mmnp)3d+soB!IY-2YRr{=<=gV4ju?eo8D=|U8!1b$Kgi1-avT*84YRl(Hx$y=$W2;H{d;h zLaAn_>075zz$iIKyJ{raY0FuZQboyA*Z1wOxqH}?=(2UQ_)$csfGF9|QlHVZ=Vdy3 z2!|QnojyG0SQQ2(#?ptRj`1Bi&9QE7Kf8zQIJL{=vkbmVRNy{E2|`I)yd@*+kcE7U zO;`bH4vMmUSa&>SX^q9nm-J<8crtP4v57b>4p}WVI3Z;c?Zy;n?KmeC-sA?@M}cU8#_kCN`9X^*oUa!YUN?ElA8T1^2_f z0DNc70af*0gc)0v`bfZahrUGtpybF9Xvn%V=65D?n^ z!pkDeyp;Bkuu~}a^Yf)8;$H+veQYGBlYUg8H#KPZmswytC8wj%pqqP?6~~D$f1?)K zIa(QJn+VTnTQ3?|3-sxZA*f2I=+xc1>Bd>KZ2zGirV3??pumeL?It{5V#vB^|MhFT z(-FFQ@=W@2ynHB_R4p8j>eudr4}LMKMI+!{eWzt^@sIRtg?U=i z71c8laOgNTE~!2s*k=(l*}_UzN!WH3=llrY_OQDZ89dX(;o%Dk!59I`Jexaj8o_FT z9#uk6FA%zkC|x+}x1+xLeZg#Eg$!B+%-W(4!GgjTjUcjTwd1|7-;9N1L~%UH0u&bo z>~z$YdEViB6}oH2a}1OR7;F(0z>r+q*@SGcZ}h*jrxY_EjwE{2fvQs>VUSiana%0a zre+moYc)ZEpOF>Tp?HwW=quylKa8(rmD6jf%wd(d<3CWlXLp-8jg)Zstq$XUF?mAE z_R^`EaE{0@WoB08_6o-h1lkYnuSH2 zgea`{+_0C)QDYUA+1k;Ft&^I7wS?3ixC?D^Wd-j$x5+bi@@lDUthe%Ln21^0$_-eG zuk^rGyXL|%R5cEDJC-A|s28jwpUkS1Tw)NeL|d&qcNs2X>HxG;x*DtqN&}+Wh-`*P z)R0L~Q%6d{jOoLOjkKmkfFO3?Bd=ov+V|94aUVYAZ{-+(0+{+e2Z1vH*kEY@TIpQgPzzP~)@np@m!>byPVA?P*JlGO?qf(15r@@Nwm4 z`n++n%7NtN3ZGyD7@^ATR4s?zWg1WNoA?# zH2I_i|C8KL%K}MT%s^R+eGW$!QEXwxuwyn8EWz^_YO=|Itp1p_7A0x^I+gf21Ba;I zbcf+7MxO?Z>YS*)I*#tCUPWkdPW>1uClIw>kF#LWurf8lgHLVHh&Y(2U;@2a)J;pt z>Cf^^AaklQCUK&;p^DGBLZQb_+pA{Q_;RE8$ujvd2@Z5~B6}pks>$YDpg;Eb7DS2^ z)Hio0 z)P1vWyyWoQw<@#~HBwYn92i7lTmM4ZBee;l_8^yA*=)Fq(ew$m5J7gki1F-#C%Uk8 zY<+n@+)Q?4?HDrg=tOugsg0Cq?>4Csik$yPE7KCd>ocVj;t9 zT5o04fa*Yia!}@uSDhJD(!@sTDWm~#9Hb#(vJoSpcJL`j8=h4y5N`|ME2kSeKRTQN z?8U(u<3qLs%MMREkH{mE=+2izqc7;OA=f}1u@ouWFt$Uu4NVx2yQ1!l!c8EI9U%0D z^N!X-S<{$d!t);J``+^%DD-_}oa}O&2ivHTnR>n`rbHxPvZ6=PPU00gQ==sYsFmIC zqjx3dvj$H=r4L6Xe4y;2*fRSzAS+Hu4ehLQx)CuOy|8_dnqRRZ0s>Fnp&oOD_{H%L zB=TMGqy|O?cl1@SIrto*tiJR=;!y4DK^dWvE4v>tgnuFTG(WBA-i8Lp?U;?FpGdHd9uVbwcU4cb%kvDM6ALGzp8 zc3T-Dw!CaRT2E}X=K{E$!h57m)(&a&9+6CZ?`}(3;&B{|{^XuRSKlPIgLg}+sjExn zwqLQ)QP(1nm`;bV(N*z#WW}su1mU0I1LHahmu|Cu3 zbG-W+#=De%>zFey&%x`**yk`RM=l0DSR=4x5COxwb4c}O9%0N%)z}z0oyFV1&jfa} z-KG4H{1^+{>l!WtvMlN8;-uPp;?JHScK|$&?t~M1c=!PNwH8}R76c-O1OWJq^ml79 z{{O~W?AP*IRY~UOme1D2$l1cq_8)sbWi9171%!7v6d*!SN`(%sy+v#hL{#%{0)V2x zk=k5+MiTN?T!hCYrMq@lOGl9b#LO>8k^G|$@|se4`vW1FXW8pN9#(nVKfk{}g~^TU zOACSb{_;Nl^2~mEiKYERKvJ3Ov_~0sm8#ywin}TW)kLe2HWnJ3gj)(3%`5K84a6% ztRqqt*Pm!6J`>@Bjg)Z432UnmRQs^Fi}t%C*)qU@z88J9wPvRQQQmnE)+v2y?;N`) zwlw0&W96BGP18LYzy%e?NOpiXLNbJrVg?0$WRzopuwO?RfXg8Sg(bt7qcibj7@2PD zYUZmaFv5a_yk@p^$yBvfIMo4_EI2xc@*k~1umiRFW|5#+#Hm6$&J@|;V+#s@70ZEe ziF?7Ry0kU6p}~%1Q`dAE6DSVjh8<*mqBi3YKs{0NA^z!)N`8IZ4`6^q7Ucr2G9fz7%YCD|W>n zkn0tAjssLqV34FL4z0Y^2mz5G)*K!}MuJEXe1tf3_H*KeP=O9SQXvr1T~?nPIQlh! zS}Epf4Gg5zGz2m87N?)@;NlCe+BQQmcS=!OxBzfK0ZTHUXj=C#XZxs_Zdi)Oj4*y=d_;77|!Ty=E<(*c+Cyx4ZLxmPrRO-E2MZqRt3ZP|$=xn~tKysI~oI zt<)%en$xJ=HI-OfHtUNdI%F+pHPMbt=c*sr7X;bD2pxhuj3WW^yk`A&!&=uG+Ba3J zRm4E!9%?PQIIXysFh;bCWkAADPyFAQPw}d$);OnJL^&IHQ)0OcJ~Krg-+7%-#vAR* z`%*-dsQ|&IK%%40K8(L9&3!fSySmbg=cz;*Le)-4c&SNR){{xITw8uM572FV6R45F z;NwSyA9i&szI}F9974$JGm}$x@D4zP?F)o9%LB6|9O#1(8iW7h!o5bUE!Ai$V>d$b z6^v;ZsV!i|H5UgdOTKw$7h}iDxb`sph~r^n1_h&-IGl)m|d48df}(pViYy zd+%U;Vq^YV9m~wIW)zIl z*i$^bHlgBj6Al)C_&l8uFVBG|yg{f%;`?Y=V@gewaq4z@;4OQCxt0$@%6QTeg$ujd zb!Qc*pX;X+FWZ4-A0wraLyFnhVB0xgGh}~9FM2|eU(nB%I1UbmmS zOaPVHtt|DSNY==VGP5?MQl|ZbMP)U?>;(6?xJQ61|F{ee?R_>*7&KgGlwEQY2#Mv$ z1)xFcrJ>A=D(t|@*>VuiPxo8DQsV<4^`+q^YkJBs`gbuXZWOI<7p{Hr4fSH&oprE0 zW-O8sjCy&8(J}4y3ZMQ(t-a3Nr~DT@>Xh&JvzqT>+70u3dGXz`u;hszQkdU~+kEmV z44_SLrc;$NkS&o_=M7J|?-DQ+@ngm$%aumwle(AASF0 znYUoijK}!%G34_zU-A3M?LqsWvoJr`fB?!Cjuyh!7ACgNB#eIqoX^jI!xly1MYgMZ zFva1fqul>$<5$qAjKvNrl~>rn3TIujzG&4>LeOi<8{WJgS-H*%q)G%|@&N!j!}mZq zl_X_(?=Oz`MEV9H(#i~1Hy5&^dA`Q%HreWQG5&skSv zl>wv9ZS8X1aBK8@C#sUWH$qm1aZk+r8+~CrDnxK;%dR-|VQ3*!eyhlniQdaKYOsv2 zcmA{?gVcJHgg@bg0g(lvKa4n(MzDx3sWDY2O5^)vG6c48l6K)~tKb-{4=TQ2aJ*@1 zY2=dpY*-CFZAbW$3=Ve#`r!}-8aGb$E^M$yHGXhI!S-cn89qpO_}-EdM|1+29D_Qs zZnQ^-sJ3gRVpVgMDjT+IV~2(dMyky=E={^R2g$-wV6Tl+X0Ju?2n9mEIOyRL6zzcC zmb#Mnh-ZCKDGku(K1VrEO@7NN_GsA}(GX`vY@Xs$JGCKIN4`lxXhd`{oyPnqyiAAu z)eh`kYCO+od`#niQU>h7NZvsi7J;7q9jqc;w;msCwuc zXjB~fM54C)hqbi^iC|v8sW*)AyXT|7>WnxjLz;Q!(HoJweru1KzCTJV$o~%Ol2WQ+ zbvNzRdE>}b#!|^yW_!6+qaoLf?`gR3Nzhbfug;LDfQHxcd&;URqFdYiGp9Af`2J90 zW~$}t;wQgVrSKY*7DW;wF0FZVPqLmm4HwcQN4e>Q0MR3sC_1GI1gqom%Pnbn##{Bn zaVnXql7*?buZYF9LNQyYQB#E6#s(1#NANs2TSp7LXLP)Pp99!0;jh>T$p;uFr^wzf z);{(LXQ?)Z(s)@yn8#i@J^+eU7`g`tT|J&&8g7q1-cG&Lai;i-0`37v2i<_ZYeS6B z&-kdoF+US=kN!CKxMWPS+`;qB?$G$U|I{{%l(EKee*ugoEFI+^W} zt#h_K+6Y(AN;<76+4lqR*8?OoF@YBIzk#!8XF4>Y4l{eGaHSzYuzy`|DDIF)=zcD+}=@BYfcB_Y0PE^IhZ~^7jxox z@FA<<^C=ClVav;XpG{1vOU9O3%5R0;B zm`!QB6X2O|FA<#>DKuix(kg)$8(FEtbNpgjT|1RNEWfINA3o4?VGDa781*E8GFCrC zTP??I=>tNh69K#IBZi)^h>8g@G(pjD3_V~-Ha{8-vt`h+n5|%}$gE4)Nr?r7-eeMF zKb2NGaN>Vq%WMgzB|}`Y=yG;}AG&)k1N@0v%7LwLV(dEZJ6@4S0$nkj(;$&fyNtQA z{&mpAQ@$E2X+*j3wt_e9+=vNs7Fq0DwKuVDF&PpIZ=$TUb6IX7M@NPC*NoK3>C)ID z$VQ>!mm!IGu4@rT6vg@yvD_7%Pi9w zNuh-kXR6j9&Eh%hh?ht?EY2X!>V&cM@}&_Y#VD?QO_h4NHqpi-mVx{g=X6U&Y?smc zl3SP2=8_36)YuHlYCaS;Z15MZ3G%;Z8lQSDK>oVkEp9$ZNw8s*GnKF2pvc-kH*9p^$tH+ampv^G!!Zu1%2 z*-z1B_Vzj@%}X$&Did(Tmd=zp^iv3Mbqi7HmOyn@h)xq3ssM8cizWJZjijvB zj#d`*?^df5@xLIZw_#6wyEg3$hZfh!<;k5%f*7gYEk_udai>74iV!1Kgx3+2?hooJ znOkt?%%+C{^tadM)yP4FY8KU;U;x@Q+E%P5j5+hi%pmp)lu(Z@rGZWvRA<-{&Ib@0hsTjEV|Vk>D%N_& zv)rqpL%n?Q<+M*M23s%USS6vuH?hs2F4;tk6WH)!4&K~3MNfo-HQLyXdD!88Bbfu4 z$2s90FyUTC2c0!TjHXobNcht`2;48Nk?+3uWT*+}sSGCNg_E8#B0O;BSuz>UgZL1; z3B4`v{gF2$^2aJta*VsoT)fi!ft5Rm5wISOXCQ9AVv%>5a_BSiiK4(r1yiTam^yS9 z*G^f!g7hUU1v|h7II*~0NcHdO7P>D3m6ita5!Fa}^e6MuKDM4|zU4F2GR2y7UkZq} zQIVGfPhhcT^rP;hDBf{@;4PWkt*b3)A`4(DYV#W;Oms*!c&ZjJ9l4wdsHj{F{gC|1->a5D%ANHtbJ-2#MRH78P=Y4lm9a5*>?5hhx&95f z7T1>JO3&p5`-ct@Eml@+-ud`RKD>QKTXJ$y^5|^QL({=S)9hkXa$8xj66x^Z30_&(1+laX3Nte7;_0^K zEUx5@YT^y&p<)jZnU(r#1j_W$mqD|PDNr#dBIQZ8AkbV zctz%s1;{Lto-$@hyQ48QqqBm~_0}PWN}}hKDXQ(?jA)l#Q%ds4Cq;1OjMjr1?h+B? z-x#-r`tia;I=N>LI>nunSDkosr3kW*uBa=^Z{BW_Z`KTAOLDMp|gcMPe{o ztfYsaUlC=YOXE>M44e?hz00}ltehxir9jHb22D?i7?wt%9PV>fD`+sFz@>X-w3%GU zWKAJGthpE9wqRE>8WxRL&w8*{3#K7sA4M+Ps*^~t&OrC0n4Nk@9~GQ47{(W{R!v@g zOZ*gILSPD(?v?|H4pZn60E{m|LV8brSD>myyv5c_V^r3QXGXjop-Ks6Mq!GZ^yLb= zG@a)w$l6j44KOv?B{tcDux4l-@@;W>ejDt3& zoCh~{8D&(n%RRf8C1`a)(Q+hFOgmFJI02+5o@}l-K6S<+&9!~3*B;Njn}^9UDl6Q! z+yq#UHlw7IYKHlNm026{A$w z6vy9D zgGN|_kyXk9bCP`QtLE%(+uizLeJ20Bl$$=7AR%f44XwfIzuJ z2uXR{1l>EI5`=D3-8|Kev?>r>8%CoF6hSy>t@b2P%a@#6V*XQM`>;2)3(g!jZhuBzDSVKsIkbRl~N02X&M3q zp52ySodZ+h^&jg4{Dy6vI%L$+MwELLjo8-*v$bp;^R~Ux%>|xUvv+VE&W0a4>B~MT zrVks1X<2T^>YG0PZ=EK?mwcio2`y=sUC+f_{Ib$Brq@%>@0=e%UgO^g+eKAzx)RAK z_6$)TK_k#VnZCLI4saG zCV~zhGK{UILy_KX1T-u*lddaVhQ)>YbE5BuUunYZ6hLo?W3hDh)V>bkIh`vu4^cjF zEa2UqF>c~P&!>`hS(G}oz0i@-OR_5!9GGrx%4E_xGbWWdpk)Q#C!qR3y(dxm)a`*& zvreiJL*kSp))b1duv>y~lp{uQzsm(xKxu|{MTENJ+Ga#7`+hLcKA6GJFg@3=eU4Of zg_+Y3Rym%6=S$}nDQ9d%Q$B=IOsbh5&f)raBjj4*w+aP%PY6EZ=~`kGf%UbUL|}dX zHc7a$^=K(PfAZKCT(W-Y0zX66wZtrfE1qVZ_L3FU@YtnUuW!Znot*mUjuI zDMWq!NTVs5?K}NmQl(hh3ew=$dqdMYh@JWfX^TArX4to@^j+Ha=VJ8 zHMxy=aVIQl(yN!=aw(RC@DJX7M*Jln%Q*GyW}jNh&B<8HW4y4?Y3i=-6_!O z9%!$KW>99jvJ<d4QhPZ)uC(tf3b zm-sG8U_&GN40Vz^i)W5eNN?38*{JC2+2E=e{}mT*&o$RYkMc!OjgLTNgg;!4aD!d| zMHhtbXN∾u%TSd>TyC-^KUAJO=B*#qa-!is!_2qZ5%WTrikeG|K zAm5qC+e&?9Cpp%zNRV302jPv&ZsvnUhYibvVU{|dxzXUz$5qy$dsC(a`%(Rve$6~x#AnJk z@qqW@{tRU8^UIR?=vQvMd@Q?3*W4s|FU#16-Am+?22{?iwl8ldiyJs0E}<^=+aD%) zY!Y+b`51D}?_n|Xd&BXoV(>6Z+o|Dvw1OX;;o>|}m$Q0D#2uR|blnN4SI(@13Z8ml z*HS18OrFmGdeZpI7tRWRKNch?^n{M^06e`xt3D&xKeGz#ENvTy9C1yDY)QEF*hJ6T z+(E6&+07zmx`F?Qx+s7@#(PNxLtNe5(T>!ex2;#8o-Dc~rsU}dl(Gls9nYdT6uP6p z${ykni@7Dms1|GE7m&ZXch0mcDt>bHbcJz!4#hPT-Fj6C`YzKME~cuVBsJC(ZwOzT zQLhgO-tbCoM*ZC^QoC55-~pKC1LVF1jNsdg3%p77mo>$ayz~Ik;PbN@VRbV~ z?`)Go#V}qfmD-^fg4UD6a(3|^|G-a)o@0hd#=eoO{2@s>hUXrEPgAikdJ1f`b;c`f zS%PpbPHOB+6jsD{4zXHTHqucQh7b?>*Bgj*7~7clxA+GMsaW90>ER*IEJgVC27Lh_ zDMppE+fF5_4mWZ+bGmv8X}e>RSG|4n6ua?K91gxF0aGnM>3;-(SK0&Y%2i7)%&Jcf z;2l+cEa*5+J+-~(E}K%VS9ZFb63RZBO-ot&XFAV*p?YJ^?sxV-oudqq~a zqt%eH5uBI;m_aYJ=ymn*n8d4vg0eZrT6Q*lgjS`xuL`Nt>vGjjBZ0r=#CX_^S^0)3 zA_ON_x_|FZ3Comx?eYDz7m}|kg~<>5%uQ*w1#vbJ#x?-PqnKXlf}(264?YX#QS>T0 z9^q-e1p(RO*5vBu&LavPLeL@K=?R(Rom^%7^qQCIM4qZQ%R{LO1c$7wUq`WLRl|_1 zia=v0ca!iUTz*rUJ*kPRf*V+>)P z%J)Lz$}GmgY={xg^(my)D^(cs;vRn&qKK})OFaY?4!EzwE|uOD=Jqh;$+3%yN>{r(BDqtN>#PM~b3sS0ezUScGx9Nz>;-MAuB-c9 zvS68&3KTB|6e)3gBcn-mh<WAqRpW+xc9!#6)uB^0UF2R_uzK%C}P(y1`Qv zVjjj>wg)Frd}SllY)9I`YVtroTGAV@jFf=3tFD6fOsq zP`G_I)`cnIfUT!?CuGqG^Q+onZcU#G;InFt_*5HKn=*9(F}SY2KE>+Io5PJ3=*^%J zaV_X8lhhi7vp0ljuL017<{R+Y@3lji`bkm}z5^leU~g9fc7X+WUOD$+HBq&GA^s4V z3yxMi4jgn7k^ITg9xLFpNVMPJkR~Dmn5mh{23O=TJ?QEx`RlVUrlBp*Wn`Hycg+%XFNAJ^Vfh&Pc}@(4;(uk|U5uedvD zR&Sh46-oEp=H;6f?d3&c#hZm<6FGLYEpN|@;j2Fz9&gXKqPZ&vaCbNxIMs9I9y+?s zOS==IeTtCZC+pEeo`a-@Sl&?uHVCiYvDe0LdQ-N7_=bd4LeeL1M4rp)m;$Y0!+q$u zg}u3Zsw77akL>VdRXeyPJDMTg7i6A0{EM8ZNB!8if>tf?a^z7Z&c-~L zE4OJ?hO|oK=yPS?G*%{8Df(9}*s%fj&YeM_V#~L-!CNzI_wd@X+;S-(XQVq4bo-v@ z9;sG013j^l50;)>BAJgS}HRAY~7gGr>60AFy zLbb!;nkwA;Mya!_UN%EI%vXALotMmcsa&G>>J&nciBoG^g0{mB5@{Z^E2Uh0rRk;{b-P}D6JPx}2;VhCxhrd>-}h9L=U_i}{eDv{ z**qV$$wY56mh+73HOABK?C>HzJ&(^jgEzY4wy^p5iSh_|H`uzluoyS^ix~$*)nM0RDYN$L~>Ee>VT9@A&r`zbZU_%j(bO zpV{btXY1Fw?r#D8$%+26<{$MR|K9Ge3XtEz{4ccsiv|C8llWB;@>?wbZkWH2>Hq4C zKhyPpRfznSn*YYA|98E}U+w&vK>myT|66!}M!;V$;eTC(zs&ysRSNJ|tAD0S{UWFT z7G$vhq18W5^8ae+&zzB8%&Xs$1ONYT=-=?L{%Y*c8SGy)l;864GadN|c;l z{%Y^fnaE#x@!zt6_P?|DPg(LmoBfqd`E#D-SDN#;1pkE5@7&6NCO!YE|L1$qKcf~O b_s{zO=~2+1AprmY_w!ZsGbDwXe*OACHBu56 literal 0 HcmV?d00001 diff --git a/src/siri_server.py b/src/siri_server.py new file mode 100644 index 0000000..b17556a --- /dev/null +++ b/src/siri_server.py @@ -0,0 +1,153 @@ +import socket +import sys +import threading +import time + +BUFFER_SIZE = 512 +ENCODING = 'ascii' + +MAX_SECURITY_HEADER_SIZE = 128 +MIN_FIELDS_NB = 8 + +threads = {} +to_join = [] +delay_results = {} +time_sent = {} +mac = {} + + +class HandleClientConnectionThread(threading.Thread): + """ Handle requests given by the client """ + + def __init__(self, connection, client_address, id): + threading.Thread.__init__(self) + self.connection = connection + self.client_address = client_address + self.id = id + + def run(self): + try: + print(self.id, ": Connection from", self.client_address) + + # Some useful variables + first_data = True + expected_req_size = 0 + res_size = 0 + waiting_time = 0 + expected_delay_results = 0 + buffer_data = "" + next_buffer_data = "" + message_sizes = [] + message_id = 0 + + # Initialize logging variables + delay_results[self.id] = [] + time_sent[self.id] = [] + + # Receive the data and retransmit it + while True: + data = self.connection.recv(BUFFER_SIZE).decode(ENCODING) + + if len(data) == 0: + # Connection closed at remote; close the connection + break + + # Manage the case where the client is very very quick + if not data.endswith("\n") and "\n" in data: + carriage_index = data.index("\n") + buffer_data += data[:carriage_index + 1] + next_buffer_data += data[carriage_index + 1:] + used_data = data[:carriage_index + 1] + else: + buffer_data += data + used_data = data + + message_sizes.append(len(used_data)) + + if first_data and len(buffer_data) <= MAX_SECURITY_HEADER_SIZE and len(buffer_data.split("&")) < MIN_FIELDS_NB: + continue + + if first_data: + split_data = buffer_data.split("&") + expected_delay_results = int(split_data[4]) + if len(split_data) - MIN_FIELDS_NB == expected_delay_results: + message_id = int(split_data[0]) + expected_req_size = int(split_data[1]) + res_size = int(split_data[2]) + waiting_time = int(split_data[3]) + time_sent[self.id].append(int(split_data[5])) + + # Little check, we never know + if self.id not in mac: + mac[self.id] = split_data[6] + elif not mac[self.id] == split_data[6]: + print(self.id, ": MAC changed from", mac[self.id], "to", split_data[6], ": close the connection", file=sys.stderr) + break + + for i in range(expected_delay_results): + delay_results[self.id].append(int(split_data[7 + i])) + first_data = False + if len(delay_results[self.id]) % 10 == 0: + print(self.id, ":", time_sent[self.id]) + print(self.id, ":", delay_results[self.id]) + else: + # Avoid further processing, wait for additional packets + continue + + if len(buffer_data) == expected_req_size: + print(self.id, ": Received request of size", expected_req_size, "; send response of", res_size, "after", waiting_time, "s") + time.sleep(waiting_time) + self.connection.sendall((str(message_id) + "&" + "0" * (res_size - len(str(message_id)) - 2) + "\n").encode(ENCODING)) + first_data = True + buffer_data = "" + message_sizes = [] + if len(next_buffer_data) > 0: + buffer_data = next_buffer_data + next_buffer_data = "" + message_sizes.append(len(buffer_data)) + + elif len(buffer_data) > expected_req_size: + print(len(buffer_data), len(data), len(used_data), file=sys.stderr) + print(self.id, ": Expected request of size", expected_req_size, "but received request of size", len(buffer_data), "; close the connection", file=sys.stderr) + print(buffer_data, file=sys.stderr) + break + + finally: + # Clean up the connection + print(self.id, ": Closing connection") + self.connection.close() + to_join.append(self.id) + + +def join_finished_threads(): + """ Join threads whose connection is closed """ + while len(to_join) > 0: + thread_to_join_id = to_join.pop() + threads[thread_to_join_id].join() + del threads[thread_to_join_id] + +# Create a TCP/IP socket +sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +# Handle reusing the same 5-tuple if the previous one is still in TIME_WAIT +sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + +# Bind the socket to the port +server_address = ('0.0.0.0', 8080) +print("Stating the server on %s port %s" % server_address) +sock.bind(server_address) + +# Listen for incoming connections +sock.listen(4) + +# Connection identifier +conn_id = 0 + +while True: + # Wait for a connection + connection, client_address = sock.accept() + thread = HandleClientConnectionThread(connection, client_address, conn_id) + threads[conn_id] = thread + conn_id += 1 + thread.start() + join_finished_threads()