msg: new experience to mimic messaging traffic

This commit is contained in:
Quentin De Coninck 2017-01-09 16:03:05 +01:00
parent 8476882cf2
commit d56a6542a8
5 changed files with 235 additions and 0 deletions

View File

@ -16,6 +16,7 @@ class MpExperience:
VLC = "vlc"
IPERF = "iperf"
DITG = "ditg"
MSG = "msg"
def __init__(self, xpParam, mpTopo, mpConfig):
self.xpParam = xpParam

73
src/mpExperienceMsg.py Normal file
View File

@ -0,0 +1,73 @@
from mpExperience import MpExperience
from mpParamXp import MpParamXp
import os
class MpExperienceMsg(MpExperience):
SERVER_LOG = "msg_server.log"
CLIENT_LOG = "msg_client.log"
CLIENT_ERR = "msg_client.err"
PING_OUTPUT = "ping.log"
def __init__(self, xpParamFile, mpTopo, mpConfig):
MpExperience.__init__(self, xpParamFile, mpTopo, mpConfig)
self.loadParam()
self.ping()
MpExperience.classicRun(self)
def ping(self):
self.mpTopo.commandTo(self.mpConfig.client, "rm " + \
MpExperienceMsg.PING_OUTPUT )
count = self.xpParam.getParam(MpParamXp.PINGCOUNT)
for i in range(0, self.mpConfig.getClientInterfaceCount()):
cmd = self.pingCommand(self.mpConfig.getClientIP(i),
self.mpConfig.getServerIP(), n = count)
self.mpTopo.commandTo(self.mpConfig.client, cmd)
def pingCommand(self, fromIP, toIP, n=5):
s = "ping -c " + str(n) + " -I " + fromIP + " " + toIP + \
" >> " + MpExperienceMsg.PING_OUTPUT
print(s)
return s
def loadParam(self):
"""
todo : param LD_PRELOAD ??
"""
pass
def prepare(self):
MpExperience.prepare(self)
self.mpTopo.commandTo(self.mpConfig.client, "rm " + \
MpExperienceMsg.CLIENT_LOG)
self.mpTopo.commandTo(self.mpConfig.server, "rm " + \
MpExperienceMsg.SERVER_LOG)
def getSiriServerCmd(self):
s = "python3 " + os.path.dirname(os.path.abspath(__file__)) + \
"/msg_server.py &>" + MpExperienceMsg.SERVER_LOG + "&"
print(s)
return s
def getSiriClientCmd(self):
s = "python3 " + os.path.dirname(os.path.abspath(__file__)) + \
"/msg_client.py &>" + MpExperienceMsg.CLIENT_LOG + "&"
print(s)
return s
def clean(self):
MpExperience.clean(self)
def run(self):
cmd = self.getSiriServerCmd()
self.mpTopo.commandTo(self.mpConfig.server, "netstat -sn > netstat_server_before")
self.mpTopo.commandTo(self.mpConfig.server, cmd)
self.mpTopo.commandTo(self.mpConfig.client, "sleep 2")
cmd = self.getSiriClientCmd()
self.mpTopo.commandTo(self.mpConfig.client, "netstat -sn > netstat_client_before")
self.mpTopo.commandTo(self.mpConfig.client, cmd)
self.mpTopo.commandTo(self.mpConfig.server, "netstat -sn > netstat_server_after")
self.mpTopo.commandTo(self.mpConfig.client, "netstat -sn > netstat_client_after")
self.mpTopo.commandTo(self.mpConfig.server, "pkill -f siri_server.py")
self.mpTopo.commandTo(self.mpConfig.client, "sleep 2")

View File

@ -19,6 +19,7 @@ from mpExperienceSiri import MpExperienceSiri
from mpExperienceVLC import MpExperienceVLC
from mpExperienceIperf import MpExperienceIperf
from mpExperienceDITG import MpExperienceDITG
from mpExperienceMsg import MpExperienceMsg
from mpExperienceNone import MpExperienceNone
from mpExperience import MpExperience
from mpECMPSingleInterfaceTopo import MpECMPSingleInterfaceTopo
@ -122,6 +123,8 @@ class MpXpRunner:
self.mpTopoConfig)
elif xp == MpExperience.DITG:
MpExperienceDITG(self.xpParam, self.mpTopo, self.mpTopoConfig)
elif xp == MpExperience.MSG:
MpExperienceMsg(self.xpParam, self.mpTopo, self.mpTopoConfig)
else:
print("Unfound xp type..." + xp)

59
src/msg_client.py Normal file
View File

@ -0,0 +1,59 @@
import datetime
import random
import socket
import string
import time
BUFFER_SIZE = 2048
ENCODING = 'ascii'
MSG_SIZE = 1200
threads = {}
to_join = []
def string_generator(size=6, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Handle reusing the same 5-tuple if the previous one is still in TIME_WAIT
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# Bind the socket to the port
server_address = ('10.1.0.1', 8000)
print("Try to connect to %s port %s" % server_address)
sock.connect(server_address)
delays = []
try:
for i in range(5):
time.sleep(5.0)
request = string_generator(size=MSG_SIZE, chars=string.digits)
start_time = datetime.datetime.now()
sock.sendall(request).encode(ENCODING)
buffer_data = ""
while len(buffer_data) < MSG_SIZE:
data = sock.recv(BUFFER_SIZE).decode(ENCODING)
if len(data) == 0:
# Connection closed at remote; close the connection
break
buffer_data += data
if len(buffer_data) == MSG_SIZE:
stop_time = datetime.datetime.now()
delays.append(stop_time - start_time)
else:
print("An issue occured...")
break
finally:
# Clean up the connection
print("Closing connection")
sock.close()
print(delays)

99
src/msg_server.py Normal file
View File

@ -0,0 +1,99 @@
import datetime
import random
import socket
import string
import threading
import time
BUFFER_SIZE = 2048
ENCODING = 'ascii'
threads = {}
to_join = []
def string_generator(size=6, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
class HandleClientConnectionThread(threading.Thread):
""" Handle requests given by the client """
def __init__(self, connection, client_address, id, msg_size):
threading.Thread.__init__(self)
self.connection = connection
self.client_address = client_address
self.id = id
self.msg_size = msg_size
self.delays = []
def run(self):
try:
print(self.id, ": Connection from", self.client_address)
start_time = None
buffer_data = ""
# Receive the data and retransmit it
while True:
data = self.connection.recv(BUFFER_SIZE).decode(ENCODING)
if len(data) == 0:
# Connection closed at remote; close the connection
break
buffer_data += data
if len(buffer_data) == self.msg_size:
stop_time = datetime.datetime.now()
if start_time:
self.delays.append(stop_time - start_time)
time.sleep(5.0)
response = string_generator(size=self.msg_size, chars=string.digits)
start_time = datetime.datetime.now()
self.connection.sendall(response).encode(ENCODING)
buffer_data = ""
elif len(buffer_data) > self.msg_size:
print("Too much data; break")
break
finally:
# Clean up the connection
print(self.id, ": Closing connection")
self.connection.close()
print(self.delays)
to_join.append(self.id)
def join_finished_threads():
""" Join threads whose connection is closed """
while len(to_join) > 0:
thread_to_join_id = to_join.pop()
threads[thread_to_join_id].join()
del threads[thread_to_join_id]
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Handle reusing the same 5-tuple if the previous one is still in TIME_WAIT
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# Bind the socket to the port
server_address = ('0.0.0.0', 8000)
print("Stating the server on %s port %s" % server_address)
sock.bind(server_address)
# Listen for incoming connections
sock.listen(4)
# Connection identifier
conn_id = 0
while True:
# Wait for a connection
connection, client_address = sock.accept()
thread = HandleClientConnectionThread(connection, client_address, conn_id, 1200)
threads[conn_id] = thread
conn_id += 1
thread.start()
join_finished_threads()