From add9ad43c96e910580323dc0b3a6cc48891c2633 Mon Sep 17 00:00:00 2001 From: Quentin De Coninck Date: Thu, 23 Feb 2017 11:36:08 +0100 Subject: [PATCH] mpExperienceSiriMsg: siri with msg background traffic --- src/mpExperience.py | 1 + src/mpExperienceSiriMsg.py | 118 +++++++++++++++++++++++++++++++++++++ src/mpParamXp.py | 6 ++ src/mpXpRunner.py | 3 + src/msg_client.py | 15 ++++- src/msg_server.py | 10 +++- 6 files changed, 149 insertions(+), 4 deletions(-) create mode 100644 src/mpExperienceSiriMsg.py diff --git a/src/mpExperience.py b/src/mpExperience.py index 1853d94..419a53d 100644 --- a/src/mpExperience.py +++ b/src/mpExperience.py @@ -18,6 +18,7 @@ class MpExperience: DITG = "ditg" MSG = "msg" SIRIHTTP = "sirihttp" + SIRIMSG = "sirimsg" def __init__(self, xpParam, mpTopo, mpConfig): self.xpParam = xpParam diff --git a/src/mpExperienceSiriMsg.py b/src/mpExperienceSiriMsg.py new file mode 100644 index 0000000..bfed0c2 --- /dev/null +++ b/src/mpExperienceSiriMsg.py @@ -0,0 +1,118 @@ +from mpExperience import MpExperience +from mpParamXp import MpParamXp +from mpPvAt import MpPvAt +import os + +class MpExperienceSiriMsg(MpExperience): + MSG_SERVER_LOG = "msg_server.log" + MSG_CLIENT_LOG = "msg_client.log" + MSG_CLIENT_ERR = "msg_client.err" + SERVER_LOG = "siri_server.log" + CLIENT_LOG = "siri_client.log" + CLIENT_ERR = "siri_client.err" + JAVA_BIN = "java" + PING_OUTPUT = "ping.log" + + def __init__(self, xpParamFile, mpTopo, mpConfig): + MpExperience.__init__(self, xpParamFile, mpTopo, mpConfig) + self.loadParam() + self.ping() + MpExperience.classicRun(self) + + def ping(self): + self.mpTopo.commandTo(self.mpConfig.client, "rm " + \ + MpExperienceSiriMsg.PING_OUTPUT ) + count = self.xpParam.getParam(MpParamXp.PINGCOUNT) + for i in range(0, self.mpConfig.getClientInterfaceCount()): + cmd = self.pingCommand(self.mpConfig.getClientIP(i), + self.mpConfig.getServerIP(), n = count) + self.mpTopo.commandTo(self.mpConfig.client, cmd) + + def pingCommand(self, fromIP, toIP, n=5): + s = "ping -c " + str(n) + " -I " + fromIP + " " + toIP + \ + " >> " + MpExperienceSiriMsg.PING_OUTPUT + print(s) + return s + + def loadParam(self): + """ + todo : param LD_PRELOAD ?? + """ + self.run_time = self.xpParam.getParam(MpParamXp.SIRIRUNTIME) + self.query_size = self.xpParam.getParam(MpParamXp.SIRIQUERYSIZE) + self.response_size = self.xpParam.getParam(MpParamXp.SIRIRESPONSESIZE) + self.delay_query_response = self.xpParam.getParam(MpParamXp.SIRIDELAYQUERYRESPONSE) + self.min_payload_size = self.xpParam.getParam(MpParamXp.SIRIMINPAYLOADSIZE) + self.max_payload_size = self.xpParam.getParam(MpParamXp.SIRIMAXPAYLOADSIZE) + self.interval_time_ms = self.xpParam.getParam(MpParamXp.SIRIINTERVALTIMEMS) + self.buffer_size = self.xpParam.getParam(MpParamXp.SIRIBUFFERSIZE) + self.client_sleep = self.xpParam.getParam(MpParamXp.MSGCLIENTSLEEP) + self.server_sleep = self.xpParam.getParam(MpParamXp.MSGSERVERSLEEP) + self.nb_requests = self.xpParam.getParam(MpParamXp.MSGNBREQUESTS) + + def prepare(self): + MpExperience.prepare(self) + self.mpTopo.commandTo(self.mpConfig.client, "rm " + \ + MpExperienceSiriMsg.CLIENT_LOG) + self.mpTopo.commandTo(self.mpConfig.client, "rm " + \ + MpExperienceSiriMsg.CLIENT_ERR) + self.mpTopo.commandTo(self.mpConfig.server, "rm " + \ + MpExperienceSiriMsg.SERVER_LOG) + self.mpTopo.commandTo(self.mpConfig.client, "rm " + \ + MpExperienceSiriMsg.MSG_CLIENT_LOG) + self.mpTopo.commandTo(self.mpConfig.client, "rm " + \ + MpExperienceSiriMsg.MSG_CLIENT_ERR) + self.mpTopo.commandTo(self.mpConfig.server, "rm " + \ + MpExperienceSiriMsg.MSG_SERVER_LOG) + + def getSiriServerCmd(self): + s = "python3 " + os.path.dirname(os.path.abspath(__file__)) + \ + "/siri_server.py &>" + MpExperienceSiriMsg.SERVER_LOG + "&" + print(s) + return s + + def getSiriClientCmd(self): + s = MpExperienceSiriMsg.JAVA_BIN + " -jar " + os.path.dirname(os.path.abspath(__file__)) + "/siriClient.jar " + \ + self.mpConfig.getServerIP() + " 8080 " + self.run_time + " " + self.query_size + " " + self.response_size + \ + " " + self.delay_query_response + " " + self.min_payload_size + " " + \ + self.max_payload_size + " " + self.interval_time_ms + " " + self.buffer_size + \ + " >" + MpExperienceSiriMsg.CLIENT_LOG + " 2>" + MpExperienceSiriMsg.CLIENT_ERR + print(s) + return s + + def getMsgServerCmd(self): + s = "python3 " + os.path.dirname(os.path.abspath(__file__)) + \ + "/msg_server.py --sleep " + self.server_sleep + " &>" + MpExperienceSiriMsg.MSG_SERVER_LOG + "&" + print(s) + return s + + def getMsgClientCmd(self): + s = "python3 " + os.path.dirname(os.path.abspath(__file__)) + \ + "/msg_client.py --sleep " + self.client_sleep + " --nb " + self.nb_requests + \ + " --bulk >" + MpExperienceSiriMsg.MSG_CLIENT_LOG + " 2>" + MpExperienceSiriMsg.MSG_CLIENT_ERR + "&" + print(s) + return s + + def clean(self): + MpExperience.clean(self) + + + def run(self): + cmd = self.getSiriServerCmd() + self.mpTopo.commandTo(self.mpConfig.server, "netstat -sn > netstat_server_before") + self.mpTopo.commandTo(self.mpConfig.server, cmd) + cmd = self.getMsgServerCmd() + self.mpTopo.commandTo(self.mpConfig.server, cmd) + + self.mpTopo.commandTo(self.mpConfig.client, "sleep 2") + self.mpTopo.commandTo(self.mpConfig.client, "netstat -sn > netstat_client_before") + cmd = self.getMsgClientCmd() + self.mpTopo.commandTo(self.mpConfig.client, cmd) + cmd = self.getSiriClientCmd() + self.mpTopo.commandTo(self.mpConfig.client, cmd) + self.mpTopo.commandTo(self.mpConfig.server, "netstat -sn > netstat_server_after") + self.mpTopo.commandTo(self.mpConfig.client, "netstat -sn > netstat_client_after") + self.mpTopo.commandTo(self.mpConfig.server, "pkill -f siri_server.py") + self.mpTopo.commandTo(self.mpConfig.server, "pkill -f msg_server.py") + self.mpTopo.commandTo(self.mpConfig.server, "pkill -f msg_client.py") + self.mpTopo.commandTo(self.mpConfig.client, "sleep 2") diff --git a/src/mpParamXp.py b/src/mpParamXp.py index 8ca16e6..6793595 100644 --- a/src/mpParamXp.py +++ b/src/mpParamXp.py @@ -58,6 +58,9 @@ class MpParamXp(MpParam): DITGBURSTSOFFPACKETSSEC = "ditgBurstsOffPacketsSec" IPERFTIME = "iperfTime" IPERFPARALLEL = "iperfParallel" + MSGCLIENTSLEEP = "msgClientSleep" + MSGSERVERSLEEP = "msgServerSleep" + MSGNBREQUESTS = "msgNbRequests" PRIOPATH0 = "prioPath0" PRIOPATH1 = "prioPath1" BACKUPPATH0 = "backupPath0" @@ -145,6 +148,9 @@ class MpParamXp(MpParam): defaultValue[DITGBURSTSOFFPACKETSSEC] = "0" defaultValue[IPERFTIME] = "10" defaultValue[IPERFPARALLEL] = "1" + defaultValue[MSGCLIENTSLEEP] = "5.0" + defaultValue[MSGSERVERSLEEP] = "5.0" + defaultValue[MSGNBREQUESTS] = "5" defaultValue[PRIOPATH0] = "0" defaultValue[PRIOPATH1] = "0" defaultValue[BACKUPPATH0] = "0" diff --git a/src/mpXpRunner.py b/src/mpXpRunner.py index f9c677b..2f3aa1d 100644 --- a/src/mpXpRunner.py +++ b/src/mpXpRunner.py @@ -21,6 +21,7 @@ from mpExperienceIperf import MpExperienceIperf from mpExperienceDITG import MpExperienceDITG from mpExperienceMsg import MpExperienceMsg from mpExperienceSiriHTTP import MpExperienceSiriHTTP +from mpExperienceSiriMsg import MpExperienceSiriMsg from mpExperienceNone import MpExperienceNone from mpExperience import MpExperience from mpECMPSingleInterfaceTopo import MpECMPSingleInterfaceTopo @@ -128,6 +129,8 @@ class MpXpRunner: MpExperienceMsg(self.xpParam, self.mpTopo, self.mpTopoConfig) elif xp == MpExperience.SIRIHTTP: MpExperienceSiriHTTP(self.xpParam, self.mpTopo, self.mpTopoConfig) + elif xp == MpExperience.SIRIMSG: + MpExperienceSiriMsg(self.xpParam, self.mpTopo, self.mpTopoConfig) else: print("Unfound xp type..." + xp) diff --git a/src/msg_client.py b/src/msg_client.py index 1d7df34..8c20e89 100644 --- a/src/msg_client.py +++ b/src/msg_client.py @@ -1,3 +1,4 @@ +import argparse import datetime import random import socket @@ -11,6 +12,13 @@ MSG_SIZE = 1200 threads = {} to_join = [] +parser = argparse.ArgumentParser(description="Msg client") +parser.add_argument("-s", "--sleep", type=float, help="sleep time between two sendings", default=5.0) +parser.add_argument("-n", "--nb", type=int, help="number of requests done", default=5) +parser.add_argument("-B", "--bulk", type=bool, help="if set, don't wait for reply to send another packet", default=False) + +args = parser.parse_args() + def string_generator(size=6, chars=string.ascii_uppercase + string.digits): return ''.join(random.choice(chars) for _ in range(size)) @@ -30,12 +38,15 @@ sock.connect(server_address) delays = [] try: - for i in range(5): - time.sleep(5.0) + for i in range(args.nb): + time.sleep(args.sleep) request = string_generator(size=MSG_SIZE, chars=string.digits) start_time = datetime.datetime.now() sock.sendall(request.encode(ENCODING)) + if args.bulk: + continue + buffer_data = "" while len(buffer_data) < MSG_SIZE: data = sock.recv(BUFFER_SIZE).decode(ENCODING) diff --git a/src/msg_server.py b/src/msg_server.py index 7f1676f..564221f 100644 --- a/src/msg_server.py +++ b/src/msg_server.py @@ -1,3 +1,4 @@ +import argparse import datetime import random import socket @@ -11,6 +12,11 @@ ENCODING = 'ascii' threads = {} to_join = [] +parser = argparse.ArgumentParser(description="Msg server") +parser.add_argument("-s", "--sleep", type=float, help="sleep time between reception and sending", default=5.0) + +args = parser.parse_args() + def string_generator(size=6, chars=string.ascii_uppercase + string.digits): return ''.join(random.choice(chars) for _ in range(size)) @@ -47,10 +53,10 @@ class HandleClientConnectionThread(threading.Thread): stop_time = datetime.datetime.now() if start_time: self.delays.append(stop_time - start_time) - time.sleep(5.0) + time.sleep(args.sleep) response = string_generator(size=self.msg_size, chars=string.digits) start_time = datetime.datetime.now() - self.connection.sendall(response.encode(ENCODING)) + self.connection.sendall(response).encode(ENCODING) buffer_data = "" elif len(buffer_data) > self.msg_size: