mpExperienceSiri

With Java client and Python server
This commit is contained in:
Quentin De Coninck 2016-04-14 15:41:50 +02:00
parent b5ae4673a9
commit 8c56ffca19
5 changed files with 617 additions and 0 deletions

365
src/SiriClient.java Normal file
View File

@ -0,0 +1,365 @@
import java.io.*;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.Socket;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.Collections;
public class SiriClient {
private String serverMessage;
/* Client parameters */
public final String SERVERIP; //your computer IP address
public final int SERVERPORT;
public final int RUN_TIME; // In seconds
public final int QUERY_SIZE;
public final int RESPONSE_SIZE;
public final int DELAY_QUERY_RESPONSE;
public final int MIN_PAYLOAD_SIZE;
public final int MAX_PAYLOAD_SIZE;
public final int INTERVAL_TIME_MS;
public final int BUFFER_SIZE;
private boolean mRun = false;
private int messageId = 0;
private static final int MAX_ID = 100;
private Random random;
private long[] sentTime;
private List<Long> delayTime;
private String mac;
private int counter;
private int missed;
PrintWriter out;
BufferedReader in;
OutputStream outputStream;
OutputStreamWriter osw;
Socket socket;
/**
* Constructor of the class. OnMessagedReceived listens for the messages received from server
*/
public SiriClient(String serverIp, int serverPort, int runTime, int querySize, int responseSize,
int delayQueryResponse, int minPayloadSize, int maxPayloadSize, int intervalTimeMs,
int bufferSize) {
random = new Random();
sentTime = new long[MAX_ID];
delayTime = new ArrayList<>();
counter = 0;
missed = 0;
/* Client parameters */
SERVERIP = serverIp;
SERVERPORT = serverPort;
RUN_TIME = runTime;
QUERY_SIZE = querySize;
RESPONSE_SIZE = responseSize;
DELAY_QUERY_RESPONSE = delayQueryResponse;
MIN_PAYLOAD_SIZE = minPayloadSize;
MAX_PAYLOAD_SIZE = maxPayloadSize;
INTERVAL_TIME_MS = intervalTimeMs;
BUFFER_SIZE = bufferSize;
}
public SiriClient(String serverIp, int serverPort, int runTime) {
this(serverIp, serverPort, runTime, 2500, 750, 0, 85, 500, 333, 9);
}
protected String getStringWithLengthAndFilledWithCharacter(int length, char charToFill) {
if (length > 0) {
char[] array = new char[length];
Arrays.fill(array, charToFill);
return new String(array);
}
return "";
}
/**
* Return a random number in [@MIN_PAYLOAD_SIZE, @MAX_PAYLOAD_SIZE]
* It manages 3 cases:
* 1) remainToSend in [@MIN_PAYLOAD_SIZE, @MAX_PAYLOAD_SIZE]: return remainToSend
* 2) remainToSend - @MAX_PAYLOAD_SIZE < MIN_PAYLOAD_SIZE: return random value in
* [@MIN_PAYLOAD_SIZE, @MAX_PAYLOAD_SIZE - @MIN_PAYLOAD_SIZE]
* 3) else, return random value in [@MIN_PAYLOAD_SIZE, @MAX_PAYLOAD_SIZE]
* @param remainToSend number of remaining bytes to send >= MIN_PAYLOAD_SIZE
*/
private int sizeOfNextPacket(int remainToSend) {
if (remainToSend < MIN_PAYLOAD_SIZE) throw new AssertionError();
// Case 1)
if (remainToSend <= MAX_PAYLOAD_SIZE && remainToSend >= MIN_PAYLOAD_SIZE) {
return remainToSend;
}
int randomPart;
// Case 2)
if (remainToSend - MAX_PAYLOAD_SIZE < MIN_PAYLOAD_SIZE) {
randomPart = random.nextInt(MAX_PAYLOAD_SIZE - 2 * MIN_PAYLOAD_SIZE + 1);
}
// Case 3)
else {
randomPart = random.nextInt(MAX_PAYLOAD_SIZE - MIN_PAYLOAD_SIZE + 1);
}
return MIN_PAYLOAD_SIZE + randomPart;
}
/**
* Get a random value following a Poisson distribution of mean lambda
* @param lambda mean of the Poisson distribution
* @return random value following a Poisson distribution of mean lambda
*/
public static int getPoisson(double lambda) {
double L = Math.exp(-lambda);
double p = 1.0;
int k = 0;
do {
k++;
p *= Math.random();
} while (p > L);
return k - 1;
}
/**
* Sends the message entered by client to the server
*/
public void sendMessage() {
// if (out != null && !out.checkError() && osw != null) {
if (socket != null && !socket.isClosed()) {
int remainToBeSent = QUERY_SIZE;
// If the server has a DB, use it, otherwise set to 0
//int delaysToSend = delayTime.size();
int delaysToSend = 0;
StringBuffer sb = new StringBuffer();
// for (int i = 0; i < delaysToSend; i++) {
// sb.append(delayTime.get(0) + "&");
// delayTime.remove(delayTime.get(0));
// }
sentTime[messageId] = System.currentTimeMillis();
String startString = messageId + "&" + QUERY_SIZE + "&" + RESPONSE_SIZE + "&" +
DELAY_QUERY_RESPONSE + "&" + delaysToSend + "&" + sentTime[messageId] +
"&" + mac + "&" + sb.toString();
messageId = (messageId + 1) % MAX_ID;
int bytesToSend = Math.max(startString.length(), sizeOfNextPacket(remainToBeSent));
// System.err.println("BytesToSend: " + bytesToSend);
byte[] toSend;
try {
//osw.write(startString + getStringWithLengthAndFilledWithCharacter(bytesToSend - startString.length(), '0'));
//osw.flush();
toSend = (startString + getStringWithLengthAndFilledWithCharacter(bytesToSend - startString.length(), '0')).getBytes(StandardCharsets.US_ASCII);
outputStream.write(toSend);
outputStream.flush();
} catch (IOException e) {
System.err.println("ERROR: OUTPUT STREAM ERROR");
}
// out.println(startString + getStringWithLengthAndFilledWithCharacter(bytesToSend - startString.length() - 1, '0'));
// System.err.println("Sent " + startString + getStringWithLengthAndFilledWithCharacter(bytesToSend - startString.length() - 1, '0'));
// out.flush();
remainToBeSent -= bytesToSend;
synchronized (this) {
counter += bytesToSend;
}
try {
while(remainToBeSent > 0) {
bytesToSend = sizeOfNextPacket(remainToBeSent);
//out.println(getStringWithLengthAndFilledWithCharacter(bytesToSend - 1, '0'));
//out.flush();
if (remainToBeSent == bytesToSend) {
//osw.write(getStringWithLengthAndFilledWithCharacter(bytesToSend - 1, '0') + "\n");
toSend = (getStringWithLengthAndFilledWithCharacter(bytesToSend - 1, '0') + "\n").getBytes(StandardCharsets.US_ASCII);
} else {
//osw.write(getStringWithLengthAndFilledWithCharacter(bytesToSend, '0'));
toSend = getStringWithLengthAndFilledWithCharacter(bytesToSend, '0').getBytes(StandardCharsets.US_ASCII);
}
//osw.flush();
outputStream.write(toSend);
outputStream.flush();
remainToBeSent -= bytesToSend;
synchronized (this) {
counter += bytesToSend;
}
}
} catch (IOException e) {
System.err.println("ERROR: OUTPUT STREAM ERROR");
e.printStackTrace();
}
}
}
public void stopClient(){
mRun = false;
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static String getIPAddress(boolean useIPv4) {
try {
List<NetworkInterface> interfaces = Collections.list(NetworkInterface.getNetworkInterfaces());
for (NetworkInterface intf : interfaces) {
List<InetAddress> addrs = Collections.list(intf.getInetAddresses());
for (InetAddress addr : addrs) {
if (!addr.isLoopbackAddress()) {
String sAddr = addr.getHostAddress();
//boolean isIPv4 = InetAddressUtils.isIPv4Address(sAddr);
boolean isIPv4 = sAddr.indexOf(':')<0;
if (useIPv4) {
if (isIPv4)
return sAddr;
} else {
if (!isIPv4) {
int delim = sAddr.indexOf('%'); // drop ip6 zone suffix
return delim<0 ? sAddr.toUpperCase() : sAddr.substring(0, delim).toUpperCase();
}
}
}
}
}
} catch (Exception ex) { } // for now eat exceptions
return "";
}
public void run(String macWifi) {
mRun = true;
mac = macWifi;
try {
//here you must put your computer's IP address.
InetAddress serverAddr = InetAddress.getByName(SERVERIP);
System.err.println("Me: " + getIPAddress(true));
System.err.println("TCP Client: Connecting...");
//create a socket to make the connection with the server
socket = new Socket(serverAddr, SERVERPORT);
// Needed to emulate any traffic
socket.setTcpNoDelay(true);
new Thread(new Runnable() {
public void run() {
final long startTime = System.currentTimeMillis();
while (socket != null && !socket.isClosed()) {
try {
Thread.sleep(INTERVAL_TIME_MS); //* getPoisson(3));
if ((System.currentTimeMillis() - startTime) >= RUN_TIME * 1000) {
stopClient();
} else if (!socket.isClosed() && counter <= QUERY_SIZE * BUFFER_SIZE) {
sendMessage();
} else if (!socket.isClosed()) {
missed++;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
try {
//send the message to the server
out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())), true);
outputStream = socket.getOutputStream();
osw = new OutputStreamWriter(socket.getOutputStream());
System.err.println("TCP Client: Done.");
//receive the message which the server sends back
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
//in this while the client listens for the messages sent by the server
while (mRun) {
serverMessage = in.readLine();
long receivedTime = System.currentTimeMillis();
// System.err.println("SERVER MESSAGE: " + ((serverMessage == null) ? "" : serverMessage));
if (serverMessage != null) {
int id = Integer.parseInt(serverMessage.split("&")[0]);
// System.err.println("ELAPSED TIME: " + (receivedTime - sentTime[id]));
delayTime.add(receivedTime - sentTime[id]);
synchronized (this) {
counter -= QUERY_SIZE;
}
}
serverMessage = null;
}
//System.err.println("RESPONSE FROM SERVER: Received Message: '" + serverMessage + "'");
} catch (SocketException e) {
System.err.println("TCP: Socket closed");
} catch (Exception e) {
System.err.println("TCP: Error " + e);
} finally {
//the socket must be closed. It is not possible to reconnect to this socket
// after it is closed, which means a new socket instance has to be created.
socket.close();
}
} catch (Exception e) {
System.err.println("TCP C: Error" + e);
}
}
public static void usage() {
System.out.println("Usage: siriClient serverIP serverPort runTime [querySize responseSize delayQueryResponse "
+ "minPayloadSize maxPayloadSize intervalTimeMs bufferSize]");
}
public static void main(String[] args) {
if (args.length != 3 && args.length != 10) {
usage();
System.exit(1);
}
String serverIp = args[0];
int serverPort = Integer.parseInt(args[1]);
int runTime = Integer.parseInt(args[2]);
SiriClient siriClient;
if (args.length == 10) {
int querySize = Integer.parseInt(args[3]);
int responseSize = Integer.parseInt(args[4]);
int delayQueryResponse = Integer.parseInt(args[5]);
int minPayloadSize = Integer.parseInt(args[6]);
int maxPayloadSize = Integer.parseInt(args[7]);
int intervalTimeMs = Integer.parseInt(args[8]);
int bufferSize = Integer.parseInt(args[9]);
siriClient = new SiriClient(serverIp, serverPort, runTime, querySize, responseSize, delayQueryResponse,
minPayloadSize, maxPayloadSize, intervalTimeMs, bufferSize);
} else {
siriClient = new SiriClient(serverIp, serverPort, runTime);
}
String mac = "00:00:00:00:00:00";
siriClient.run(mac);
System.out.println("missed: " + siriClient.missed);
for (long delay: siriClient.delayTime) {
System.out.println(delay);
}
}
}

81
src/mpExperienceSiri.py Normal file
View File

@ -0,0 +1,81 @@
from mpExperience import MpExperience
from mpParamXp import MpParamXp
from mpPvAt import MpPvAt
import os
class MpExperienceSiri(MpExperience):
SERVER_LOG = "siri_server.log"
CLIENT_LOG = "siri_client.log"
JAVA_BIN = "java"
PING_OUTPUT = "ping.log"
def __init__(self, xpParamFile, mpTopo, mpConfig):
MpExperience.__init__(self, xpParamFile, mpTopo, mpConfig)
self.loadParam()
self.ping()
MpExperience.classicRun(self)
def ping(self):
self.mpTopo.commandTo(self.mpConfig.client, "rm " + \
MpExperienceSiri.PING_OUTPUT )
count = self.xpParam.getParam(MpParamXp.PINGCOUNT)
for i in range(0, self.mpConfig.getClientInterfaceCount()):
cmd = self.pingCommand(self.mpConfig.getClientIP(i),
self.mpConfig.getServerIP(), n = count)
self.mpTopo.commandTo(self.mpConfig.client, cmd)
def pingCommand(self, fromIP, toIP, n=5):
s = "ping -c " + str(n) + " -I " + fromIP + " " + toIP + \
" >> " + MpExperienceSiri.PING_OUTPUT
print(s)
return s
def loadParam(self):
"""
todo : param LD_PRELOAD ??
"""
self.run_time = self.xpParam.getParam(MpParamXp.SIRIRUNTIME)
self.query_size = self.xpParam.getParam(MpParamXp.SIRIQUERYSIZE)
self.response_size = self.xpParam.getParam(MpParamXp.SIRIRESPONSESIZE)
self.delay_query_response = self.xpParam.getParam(MpParamXp.SIRIDELAYQUERYRESPONSE)
self.min_payload_size = self.xpParam.getParam(MpParamXp.SIRIMINPAYLOADSIZE)
self.max_payload_size = self.xpParam.getParam(MpParamXp.SIRIMAXPAYLOADSIZE)
self.interval_time_ms = self.xpParam.getParam(MpParamXp.SIRIINTERVALTIMEMS)
self.buffer_size = self.xpParam.getParam(MpParamXp.SIRIBUFFERSIZE)
# Little trick here
self.xpParam.defaultValue[MpParamXp.AUTOCORK] = "0"
def prepare(self):
MpExperience.prepare(self)
self.mpTopo.commandTo(self.mpConfig.client, "rm " + \
MpExperienceSiri.CLIENT_LOG)
self.mpTopo.commandTo(self.mpConfig.server, "rm " + \
MpExperienceSiri.SERVER_LOG)
def getSiriServerCmd(self):
s = "python " + os.path.dirname(os.path.abspath(__file__)) + \
"/siri_server.py &>" + MpExperienceSiri.SERVER_LOG + "&"
print(s)
return s
def getSiriClientCmd(self):
s = MpExperienceSiri.JAVA_BIN + " -jar siriClient.jar" + self.mpConfig.getServerIP() + \
" 8080 " + self.run_time + " " + self.query_size + " " + self.response_size + \
" " + self.delay_query_response + " " + self.min_payload_size + " " + \
self.max_payload_size + " " + self.interval_time_ms + " " + self.buffer_size + \
" &>" + MpExperienceSiri.CLIENT_LOG
print(s)
return s
def clean(self):
MpExperience.clean(self)
def run(self):
cmd = self.getSiriServerCmd()
self.mpTopo.commandTo(self.mpConfig.server, cmd)
self.mpTopo.commandTo(self.mpConfig.client, "sleep 2")
cmd = self.getSiriClientCmd()
self.mpTopo.commandTo(self.mpConfig.client, cmd)
self.mpTopo.commandTo(self.mpConfig.client, "sleep 2")

View File

@ -6,6 +6,7 @@ class MpParamXp(MpParam):
WMEM = "wmem" WMEM = "wmem"
SCHED = "sched" SCHED = "sched"
CC = "congctrl" CC = "congctrl"
AUTOCORK = "autocork"
KERNELPM = "kpm" KERNELPM = "kpm"
KERNELPMC = "kpmc" #kernel path manager client / server KERNELPMC = "kpmc" #kernel path manager client / server
KERNELPMS = "kpms" KERNELPMS = "kpms"
@ -38,6 +39,14 @@ class MpParamXp(MpParam):
NETPERFREQRESSIZE = "netperfReqresSize" NETPERFREQRESSIZE = "netperfReqresSize"
ABCONCURRENTREQUESTS = "abConccurentRequests" ABCONCURRENTREQUESTS = "abConccurentRequests"
ABTIMELIMIT = "abTimelimit" ABTIMELIMIT = "abTimelimit"
SIRIRUNTIME = "siriRunTime"
SIRIQUERYSIZE = "siriQuerySize"
SIRIRESPONSESIZE = "siriResponseSize"
SIRIDELAYQUERYRESPONSE = "siriDelayQueryResponse"
SIRIMINPAYLOADSIZE = "siriMinPayloadSize"
SIRIMAXPAYLOADSIZE = "siriMaxPayloadSize"
SIRIINTERVALTIMEMS = "siriIntervalTimeMs"
SIRIBUFFERSIZE = "siriBufferSize"
# global sysctl # global sysctl
@ -47,6 +56,7 @@ class MpParamXp(MpParam):
sysctlKey[KERNELPM] = "net.mptcp.mptcp_path_manager" sysctlKey[KERNELPM] = "net.mptcp.mptcp_path_manager"
sysctlKey[SCHED] = "net.mptcp.mptcp_scheduler" sysctlKey[SCHED] = "net.mptcp.mptcp_scheduler"
sysctlKey[CC] = "net.ipv4.tcp_congestion_control" sysctlKey[CC] = "net.ipv4.tcp_congestion_control"
sysctlKey[AUTOCORK] = "net.ipv4.tcp_autocorking"
sysctlKeyClient = {} sysctlKeyClient = {}
sysctlKeyClient[KERNELPMC] = "net.mptcp.mptcp_path_manager" sysctlKeyClient[KERNELPMC] = "net.mptcp.mptcp_path_manager"
@ -66,6 +76,7 @@ class MpParamXp(MpParam):
defaultValue[USERPMSARGS] = "" defaultValue[USERPMSARGS] = ""
defaultValue[CC] = "olia" defaultValue[CC] = "olia"
defaultValue[SCHED] = "default" defaultValue[SCHED] = "default"
defaultValue[AUTOCORK] = "1"
defaultValue[CLIENTPCAP] = "no" defaultValue[CLIENTPCAP] = "no"
defaultValue[SERVERPCAP] = "no" defaultValue[SERVERPCAP] = "no"
@ -91,6 +102,13 @@ class MpParamXp(MpParam):
defaultValue[NETPERFREQRESSIZE] = "2K,256" defaultValue[NETPERFREQRESSIZE] = "2K,256"
defaultValue[ABCONCURRENTREQUESTS] = "50" defaultValue[ABCONCURRENTREQUESTS] = "50"
defaultValue[ABTIMELIMIT] = "20" defaultValue[ABTIMELIMIT] = "20"
defaultValue[SIRIQUERYSIZE] = "2500"
defaultValue[SIRIRESPONSESIZE] = "750"
defaultValue[SIRIDELAYQUERYRESPONSE] = "0"
defaultValue[SIRIMINPAYLOADSIZE] = "85"
defaultValue[SIRIMAXPAYLOADSIZE] = "500"
defaultValue[SIRIINTERVALTIMEMS] = "333"
defaultValue[SIRIBUFFERSIZE] = "9"
def __init__(self, paramFile): def __init__(self, paramFile):
MpParam.__init__(self, paramFile) MpParam.__init__(self, paramFile)

BIN
src/siriClient.jar Normal file

Binary file not shown.

153
src/siri_server.py Normal file
View File

@ -0,0 +1,153 @@
import socket
import sys
import threading
import time
BUFFER_SIZE = 512
ENCODING = 'ascii'
MAX_SECURITY_HEADER_SIZE = 128
MIN_FIELDS_NB = 8
threads = {}
to_join = []
delay_results = {}
time_sent = {}
mac = {}
class HandleClientConnectionThread(threading.Thread):
""" Handle requests given by the client """
def __init__(self, connection, client_address, id):
threading.Thread.__init__(self)
self.connection = connection
self.client_address = client_address
self.id = id
def run(self):
try:
print(self.id, ": Connection from", self.client_address)
# Some useful variables
first_data = True
expected_req_size = 0
res_size = 0
waiting_time = 0
expected_delay_results = 0
buffer_data = ""
next_buffer_data = ""
message_sizes = []
message_id = 0
# Initialize logging variables
delay_results[self.id] = []
time_sent[self.id] = []
# Receive the data and retransmit it
while True:
data = self.connection.recv(BUFFER_SIZE).decode(ENCODING)
if len(data) == 0:
# Connection closed at remote; close the connection
break
# Manage the case where the client is very very quick
if not data.endswith("\n") and "\n" in data:
carriage_index = data.index("\n")
buffer_data += data[:carriage_index + 1]
next_buffer_data += data[carriage_index + 1:]
used_data = data[:carriage_index + 1]
else:
buffer_data += data
used_data = data
message_sizes.append(len(used_data))
if first_data and len(buffer_data) <= MAX_SECURITY_HEADER_SIZE and len(buffer_data.split("&")) < MIN_FIELDS_NB:
continue
if first_data:
split_data = buffer_data.split("&")
expected_delay_results = int(split_data[4])
if len(split_data) - MIN_FIELDS_NB == expected_delay_results:
message_id = int(split_data[0])
expected_req_size = int(split_data[1])
res_size = int(split_data[2])
waiting_time = int(split_data[3])
time_sent[self.id].append(int(split_data[5]))
# Little check, we never know
if self.id not in mac:
mac[self.id] = split_data[6]
elif not mac[self.id] == split_data[6]:
print(self.id, ": MAC changed from", mac[self.id], "to", split_data[6], ": close the connection", file=sys.stderr)
break
for i in range(expected_delay_results):
delay_results[self.id].append(int(split_data[7 + i]))
first_data = False
if len(delay_results[self.id]) % 10 == 0:
print(self.id, ":", time_sent[self.id])
print(self.id, ":", delay_results[self.id])
else:
# Avoid further processing, wait for additional packets
continue
if len(buffer_data) == expected_req_size:
print(self.id, ": Received request of size", expected_req_size, "; send response of", res_size, "after", waiting_time, "s")
time.sleep(waiting_time)
self.connection.sendall((str(message_id) + "&" + "0" * (res_size - len(str(message_id)) - 2) + "\n").encode(ENCODING))
first_data = True
buffer_data = ""
message_sizes = []
if len(next_buffer_data) > 0:
buffer_data = next_buffer_data
next_buffer_data = ""
message_sizes.append(len(buffer_data))
elif len(buffer_data) > expected_req_size:
print(len(buffer_data), len(data), len(used_data), file=sys.stderr)
print(self.id, ": Expected request of size", expected_req_size, "but received request of size", len(buffer_data), "; close the connection", file=sys.stderr)
print(buffer_data, file=sys.stderr)
break
finally:
# Clean up the connection
print(self.id, ": Closing connection")
self.connection.close()
to_join.append(self.id)
def join_finished_threads():
""" Join threads whose connection is closed """
while len(to_join) > 0:
thread_to_join_id = to_join.pop()
threads[thread_to_join_id].join()
del threads[thread_to_join_id]
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Handle reusing the same 5-tuple if the previous one is still in TIME_WAIT
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# Bind the socket to the port
server_address = ('0.0.0.0', 8080)
print("Stating the server on %s port %s" % server_address)
sock.bind(server_address)
# Listen for incoming connections
sock.listen(4)
# Connection identifier
conn_id = 0
while True:
# Wait for a connection
connection, client_address = sock.accept()
thread = HandleClientConnectionThread(connection, client_address, conn_id)
threads[conn_id] = thread
conn_id += 1
thread.start()
join_finished_threads()