From d56a6542a889906c1d8cb6d1cbd8ffa1cbe1d9a6 Mon Sep 17 00:00:00 2001 From: Quentin De Coninck Date: Mon, 9 Jan 2017 16:03:05 +0100 Subject: [PATCH] msg: new experience to mimic messaging traffic --- src/mpExperience.py | 1 + src/mpExperienceMsg.py | 73 +++++++++++++++++++++++++++++++ src/mpXpRunner.py | 3 ++ src/msg_client.py | 59 +++++++++++++++++++++++++ src/msg_server.py | 99 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 235 insertions(+) create mode 100644 src/mpExperienceMsg.py create mode 100644 src/msg_client.py create mode 100644 src/msg_server.py diff --git a/src/mpExperience.py b/src/mpExperience.py index bbcfd0b..844107f 100644 --- a/src/mpExperience.py +++ b/src/mpExperience.py @@ -16,6 +16,7 @@ class MpExperience: VLC = "vlc" IPERF = "iperf" DITG = "ditg" + MSG = "msg" def __init__(self, xpParam, mpTopo, mpConfig): self.xpParam = xpParam diff --git a/src/mpExperienceMsg.py b/src/mpExperienceMsg.py new file mode 100644 index 0000000..f1064ef --- /dev/null +++ b/src/mpExperienceMsg.py @@ -0,0 +1,73 @@ +from mpExperience import MpExperience +from mpParamXp import MpParamXp +import os + +class MpExperienceMsg(MpExperience): + SERVER_LOG = "msg_server.log" + CLIENT_LOG = "msg_client.log" + CLIENT_ERR = "msg_client.err" + PING_OUTPUT = "ping.log" + + def __init__(self, xpParamFile, mpTopo, mpConfig): + MpExperience.__init__(self, xpParamFile, mpTopo, mpConfig) + self.loadParam() + self.ping() + MpExperience.classicRun(self) + + def ping(self): + self.mpTopo.commandTo(self.mpConfig.client, "rm " + \ + MpExperienceMsg.PING_OUTPUT ) + count = self.xpParam.getParam(MpParamXp.PINGCOUNT) + for i in range(0, self.mpConfig.getClientInterfaceCount()): + cmd = self.pingCommand(self.mpConfig.getClientIP(i), + self.mpConfig.getServerIP(), n = count) + self.mpTopo.commandTo(self.mpConfig.client, cmd) + + def pingCommand(self, fromIP, toIP, n=5): + s = "ping -c " + str(n) + " -I " + fromIP + " " + toIP + \ + " >> " + MpExperienceMsg.PING_OUTPUT + print(s) + return s + + def loadParam(self): + """ + todo : param LD_PRELOAD ?? + """ + pass + + def prepare(self): + MpExperience.prepare(self) + self.mpTopo.commandTo(self.mpConfig.client, "rm " + \ + MpExperienceMsg.CLIENT_LOG) + self.mpTopo.commandTo(self.mpConfig.server, "rm " + \ + MpExperienceMsg.SERVER_LOG) + + def getSiriServerCmd(self): + s = "python3 " + os.path.dirname(os.path.abspath(__file__)) + \ + "/msg_server.py &>" + MpExperienceMsg.SERVER_LOG + "&" + print(s) + return s + + def getSiriClientCmd(self): + s = "python3 " + os.path.dirname(os.path.abspath(__file__)) + \ + "/msg_client.py &>" + MpExperienceMsg.CLIENT_LOG + "&" + print(s) + return s + + def clean(self): + MpExperience.clean(self) + + + def run(self): + cmd = self.getSiriServerCmd() + self.mpTopo.commandTo(self.mpConfig.server, "netstat -sn > netstat_server_before") + self.mpTopo.commandTo(self.mpConfig.server, cmd) + + self.mpTopo.commandTo(self.mpConfig.client, "sleep 2") + cmd = self.getSiriClientCmd() + self.mpTopo.commandTo(self.mpConfig.client, "netstat -sn > netstat_client_before") + self.mpTopo.commandTo(self.mpConfig.client, cmd) + self.mpTopo.commandTo(self.mpConfig.server, "netstat -sn > netstat_server_after") + self.mpTopo.commandTo(self.mpConfig.client, "netstat -sn > netstat_client_after") + self.mpTopo.commandTo(self.mpConfig.server, "pkill -f siri_server.py") + self.mpTopo.commandTo(self.mpConfig.client, "sleep 2") diff --git a/src/mpXpRunner.py b/src/mpXpRunner.py index f5a8f51..aa20209 100644 --- a/src/mpXpRunner.py +++ b/src/mpXpRunner.py @@ -19,6 +19,7 @@ from mpExperienceSiri import MpExperienceSiri from mpExperienceVLC import MpExperienceVLC from mpExperienceIperf import MpExperienceIperf from mpExperienceDITG import MpExperienceDITG +from mpExperienceMsg import MpExperienceMsg from mpExperienceNone import MpExperienceNone from mpExperience import MpExperience from mpECMPSingleInterfaceTopo import MpECMPSingleInterfaceTopo @@ -122,6 +123,8 @@ class MpXpRunner: self.mpTopoConfig) elif xp == MpExperience.DITG: MpExperienceDITG(self.xpParam, self.mpTopo, self.mpTopoConfig) + elif xp == MpExperience.MSG: + MpExperienceMsg(self.xpParam, self.mpTopo, self.mpTopoConfig) else: print("Unfound xp type..." + xp) diff --git a/src/msg_client.py b/src/msg_client.py new file mode 100644 index 0000000..36dccd1 --- /dev/null +++ b/src/msg_client.py @@ -0,0 +1,59 @@ +import datetime +import random +import socket +import string +import time + +BUFFER_SIZE = 2048 +ENCODING = 'ascii' +MSG_SIZE = 1200 + +threads = {} +to_join = [] + + +def string_generator(size=6, chars=string.ascii_uppercase + string.digits): + return ''.join(random.choice(chars) for _ in range(size)) + + +# Create a TCP/IP socket +sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +# Handle reusing the same 5-tuple if the previous one is still in TIME_WAIT +sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + +# Bind the socket to the port +server_address = ('10.1.0.1', 8000) +print("Try to connect to %s port %s" % server_address) +sock.connect(server_address) + +delays = [] + +try: + for i in range(5): + time.sleep(5.0) + request = string_generator(size=MSG_SIZE, chars=string.digits) + start_time = datetime.datetime.now() + sock.sendall(request).encode(ENCODING) + + buffer_data = "" + while len(buffer_data) < MSG_SIZE: + data = sock.recv(BUFFER_SIZE).decode(ENCODING) + + if len(data) == 0: + # Connection closed at remote; close the connection + break + + buffer_data += data + + if len(buffer_data) == MSG_SIZE: + stop_time = datetime.datetime.now() + delays.append(stop_time - start_time) + else: + print("An issue occured...") + break +finally: + # Clean up the connection + print("Closing connection") + sock.close() + print(delays) diff --git a/src/msg_server.py b/src/msg_server.py new file mode 100644 index 0000000..d33c254 --- /dev/null +++ b/src/msg_server.py @@ -0,0 +1,99 @@ +import datetime +import random +import socket +import string +import threading +import time + +BUFFER_SIZE = 2048 +ENCODING = 'ascii' + +threads = {} +to_join = [] + + +def string_generator(size=6, chars=string.ascii_uppercase + string.digits): + return ''.join(random.choice(chars) for _ in range(size)) + + +class HandleClientConnectionThread(threading.Thread): + """ Handle requests given by the client """ + + def __init__(self, connection, client_address, id, msg_size): + threading.Thread.__init__(self) + self.connection = connection + self.client_address = client_address + self.id = id + self.msg_size = msg_size + self.delays = [] + + def run(self): + try: + print(self.id, ": Connection from", self.client_address) + start_time = None + buffer_data = "" + + # Receive the data and retransmit it + while True: + data = self.connection.recv(BUFFER_SIZE).decode(ENCODING) + + if len(data) == 0: + # Connection closed at remote; close the connection + break + + buffer_data += data + + if len(buffer_data) == self.msg_size: + stop_time = datetime.datetime.now() + if start_time: + self.delays.append(stop_time - start_time) + time.sleep(5.0) + response = string_generator(size=self.msg_size, chars=string.digits) + start_time = datetime.datetime.now() + self.connection.sendall(response).encode(ENCODING) + buffer_data = "" + + elif len(buffer_data) > self.msg_size: + print("Too much data; break") + break + + finally: + # Clean up the connection + print(self.id, ": Closing connection") + self.connection.close() + print(self.delays) + to_join.append(self.id) + + +def join_finished_threads(): + """ Join threads whose connection is closed """ + while len(to_join) > 0: + thread_to_join_id = to_join.pop() + threads[thread_to_join_id].join() + del threads[thread_to_join_id] + +# Create a TCP/IP socket +sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +# Handle reusing the same 5-tuple if the previous one is still in TIME_WAIT +sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + +# Bind the socket to the port +server_address = ('0.0.0.0', 8000) +print("Stating the server on %s port %s" % server_address) +sock.bind(server_address) + +# Listen for incoming connections +sock.listen(4) + +# Connection identifier +conn_id = 0 + +while True: + # Wait for a connection + connection, client_address = sock.accept() + thread = HandleClientConnectionThread(connection, client_address, conn_id, 1200) + threads[conn_id] = thread + conn_id += 1 + thread.start() + join_finished_threads()