data:image/s3,"s3://crabby-images/bde38/bde38e4944f052c8284408bf1dff6445cf23bbc0" alt="Logo" |
Flowgrind
Advanced TCP traffic generator
|
Go to the documentation of this file.
42 #include <sys/types.h>
43 #include <sys/socket.h>
44 #include <sys/param.h>
45 #include <sys/select.h>
46 #include <netinet/in.h>
47 #include <arpa/inet.h>
58 #include <uuid/uuid.h>
78 #define SOL_TCP IPPROTO_TCP
82 #define SOL_IP IPPROTO_IP
85 #define CONGESTION_LIMIT 10000
115 int requested_response_block_size);
125 vsnprintf(str, 1000, fmt, ap);
127 str[
sizeof(str) - 1] = 0;
138 vsnprintf(str, 1000, fmt, ap);
140 str[
sizeof(str) - 1] = 0;
177 logging(LOG_WARNING,
"failed to cancel dump thread: %s",
183 logging(LOG_WARNING,
"failed to join dump thread: %s",
204 DEBUG_MSG(LOG_WARNING,
"flow %i not started yet (delayed)",
212 DEBUG_MSG(LOG_DEBUG,
"adding sock of flow %d to wfds",
216 DEBUG_MSG(LOG_DEBUG,
"no block for flow %d scheduled "
222 DEBUG_MSG(LOG_WARNING,
"shutting down flow %d (WR)",
224 rc = shutdown(
flow->
fd,SHUT_WR);
226 warn(
"shutdown() SHUT_WR failed");
239 warnx(
"server flow %u missed to shutdown",
flow->
id);
240 rc = shutdown(
flow->
fd, SHUT_RD);
242 warn(
"shutdown SHUT_RD failed");
248 DEBUG_MSG(LOG_ERR,
"late connecting test socket for flow %d "
259 DEBUG_MSG(LOG_DEBUG,
"adding sock of flow %d to rfds",
269 DEBUG_MSG(LOG_DEBUG,
"prepare_fds() called, number of flows: %zu",
335 struct timespec start;
339 if (start.tv_sec <
request->start_timestamp) {
342 start.tv_sec =
request->start_timestamp;
357 for (
int j = 0; j < 2; j++) {
384 DEBUG_MSG(LOG_DEBUG,
"stop_flow forcefully unlocked mutex");
385 pthread_mutex_unlock(&
mutex);
449 DEBUG_MSG(LOG_DEBUG,
"process_requests trying to lock mutex");
450 pthread_mutex_lock(&
mutex);
451 DEBUG_MSG(LOG_DEBUG,
"process_requests locked mutex");
505 pthread_mutex_unlock(&
mutex);
506 DEBUG_MSG(LOG_DEBUG,
"process_requests unlocked mutex");
524 DEBUG_MSG(LOG_DEBUG,
"report_flow called for flow %d (type %d)",
644 DEBUG_MSG(LOG_DEBUG,
"report_flow finished for flow %d (type %d)",
653 struct tcp_info tmp_info;
654 socklen_t info_len =
sizeof(tmp_info);
658 rc = getsockopt(
flow->
fd, IPPROTO_TCP, TCP_INFO, &tmp_info, &info_len);
660 warn(
"getsockopt() failed");
663 #define CPY_INFO_MEMBER(a) info->a = (int) tmp_info.a;
704 DEBUG_MSG(LOG_DEBUG,
"processing timer_check() for flow %d",
726 DEBUG_MSG(LOG_DEBUG,
"finished timer_check()");
736 DEBUG_MSG(LOG_DEBUG,
"processing pselect() for flow %d",
741 DEBUG_MSG(LOG_DEBUG,
"ready for accept");
753 int error_number, rc;
754 socklen_t error_number_size =
755 sizeof(error_number);
756 DEBUG_MSG(LOG_DEBUG,
"sock of flow %d in efds",
758 rc = getsockopt(
flow->
fd, SOL_SOCKET,
760 (
void *)&error_number,
763 warn(
"failed to get errno for"
764 "non-blocking connect");
767 if (error_number != 0) {
768 warnc(error_number,
"connect");
774 DEBUG_MSG(LOG_ERR,
"write_data() failed");
780 DEBUG_MSG(LOG_ERR,
"read_data() failed");
802 struct timespec timeout;
808 DEBUG_MSG(LOG_DEBUG,
"calling pselect() need_timeout: %i",
811 need_timeout ? &timeout : 0, NULL);
815 crit(
"pselect() failed");
817 DEBUG_MSG(LOG_DEBUG,
"pselect() finished");
829 DEBUG_MSG(LOG_DEBUG,
"add_report trying to lock mutex");
830 pthread_mutex_lock(&
mutex);
831 DEBUG_MSG(LOG_DEBUG,
"add_report aquired mutex");
835 pthread_mutex_unlock(&
mutex);
849 pthread_mutex_unlock(&
mutex);
850 DEBUG_MSG(LOG_DEBUG,
"add_report unlocked mutex");
855 const unsigned max_reports = 50;
858 DEBUG_MSG(LOG_DEBUG,
"get_reports trying to lock mutex");
859 pthread_mutex_lock(&
mutex);
860 DEBUG_MSG(LOG_DEBUG,
"get_reports aquired mutex");
871 for (
unsigned i = 0; i < max_reports - 1; i++)
881 pthread_mutex_unlock(&
mutex);
882 DEBUG_MSG(LOG_DEBUG,
"get_reports unlocked mutex");
898 memset(
flow, 0,
sizeof(
struct flow));
940 int response_block_size = 0;
941 double interpacket_gap = .0;
955 htonl(response_block_size);
961 DEBUG_MSG(LOG_DEBUG,
"wrote new request data to out "
962 "buffer bs = %d, rqs = %d, on flow %d",
975 if (errno == EAGAIN) {
976 logging(LOG_WARNING,
"write queue limit hit for "
980 DEBUG_MSG(LOG_WARNING,
"write() returned %d on flow %d, "
989 DEBUG_MSG(LOG_CRIT,
"flow %d sent zero bytes. what "
990 "does that mean?",
flow->
id);
994 DEBUG_MSG(LOG_DEBUG,
"flow %d sent %d request bytes of %u "
995 "(before = %u)",
flow->
id, rc,
1020 if (interpacket_gap) {
1025 char timestamp[30] =
"";
1027 timestamp,
sizeof(timestamp),
true);
1029 "congestion on flow %u new "
1030 "block scheduled for %s, "
1031 "%.6lfs before now",
1043 DEBUG_MSG(LOG_NOTICE,
"failed to recork test "
1044 "socket for flow %d: %s",
1045 flow->
id, strerror(errno));
1062 struct cmsghdr *cmsg;
1068 iov.iov_len = bytes;
1070 msg.msg_name = NULL;
1071 msg.msg_namelen = 0;
1074 msg.msg_control = cbuf;
1075 msg.msg_controllen =
sizeof(cbuf);
1077 rc = recvmsg(
flow->
fd, &msg, 0);
1079 DEBUG_MSG(LOG_DEBUG,
"tried reading %d bytes, got %d", bytes, rc);
1082 if (errno == EAGAIN)
1089 DEBUG_MSG(LOG_ERR,
"server shut down test socket of flow %d",
1092 warnx(
"premature shutdown of server flow");
1105 for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg))
1106 DEBUG_MSG(LOG_NOTICE,
"flow %d received cmsg: type = %u, len = %u",
1107 flow->
id, cmsg->cmsg_type, (socklen_t) cmsg->cmsg_len);
1117 int requested_response_block_size = 0;
1132 optint <= flow->settings.maximum_block_size )
1135 logging(LOG_WARNING,
"flow %d parsed illegal cbs %d, "
1136 "ignoring (max: %d)",
flow->
id, optint,
1141 if (optint == -1 || optint == 0 ||
1143 optint <= flow->settings.maximum_block_size))
1144 requested_response_block_size = optint;
1146 logging(LOG_WARNING,
"flow %d parsed illegal qbs %d, "
1147 "ignoring (max: %d)",
flow->
id, optint,
1150 if (requested_response_block_size == -1) {
1151 DEBUG_MSG(LOG_NOTICE,
"processing response block on "
1152 "flow %d size: %d",
flow->
id,
1155 DEBUG_MSG(LOG_NOTICE,
"processing request block on "
1156 "flow %d size: %d, request: %d",
flow->
id,
1158 requested_response_block_size);
1178 if (requested_response_block_size == -1) {
1192 if (requested_response_block_size >=
1195 requested_response_block_size);
1206 double current_rtt = .0;
1207 struct timespec now;
1208 struct timespec *data = (
struct timespec *)
1214 if (current_rtt < 0) {
1215 logging(LOG_CRIT,
"received malformed rtt block of flow %d "
1216 "(rtt = %.3lfms), ignoring",
flow->
id, current_rtt * 1e3);
1222 if (!isnan(current_rtt)) {
1230 DEBUG_MSG(LOG_NOTICE,
"processed RTT of flow %d (%.3lfms)",
1231 flow->
id, current_rtt * 1e3);
1236 double current_iat = .0;
1237 struct timespec now;
1247 if (current_iat < 0) {
1248 logging(LOG_CRIT,
"calculated malformed iat of flow %d "
1249 "(iat = %.3lfms) (clock skew?), ignoring",
1250 flow->
id, current_iat * 1e3);
1256 if (!isnan(current_iat)) {
1263 DEBUG_MSG(LOG_NOTICE,
"processed IAT of flow %d (%.3lfms)",
1264 flow->
id, current_iat * 1e3);
1269 double current_delay = .0;
1270 struct timespec now;
1271 struct timespec *data = (
struct timespec *)
1277 if (current_delay < 0) {
1278 logging(LOG_NOTICE,
"calculated malformed delay of flow "
1279 "%d (rtt = %.3lfms) (clocks out-of-sync?), ignoring",
1280 flow->
id, current_delay * 1e3);
1281 current_delay = NAN;
1284 if (!isnan(current_delay)) {
1294 DEBUG_MSG(LOG_NOTICE,
"processed delay of flow %d (%.3lfms)",
1295 flow->
id, current_delay * 1e3);
1307 htonl(requested_response_block_size);
1320 DEBUG_MSG(LOG_DEBUG,
"wrote new response data to out buffer bs = %d, "
1321 "rqs = %d on flow %d",
1331 requested_response_block_size -
1334 DEBUG_MSG(LOG_NOTICE,
"send %d bytes response (rqs %d) on flow "
1335 "%d", rc, requested_response_block_size,
flow->
id);
1338 if (errno == EAGAIN) {
1339 DEBUG_MSG(LOG_DEBUG,
"%s, still trying to send "
1340 "response block (write queue hit "
1341 "limit)", strerror(errno));
1345 logging(LOG_WARNING,
"tried to send "
1346 "response block %d times without "
1347 "success, dropping (%s)",
1348 try, strerror(errno));
1352 logging(LOG_WARNING,
"premature end of test: "
1353 "%s, abort flow", strerror(errno));
1363 (
unsigned)requested_response_block_size) {
1365 (
unsigned)requested_response_block_size);
1382 const struct extra_socket_options *option =
1385 switch (option->level) {
1396 level = IPPROTO_SCTP;
1399 level = IPPROTO_TCP;
1402 level = IPPROTO_UDP;
1410 res = setsockopt(
flow->
fd, level, option->optname,
1411 option->optval, option->optlen);
1415 option->optname, strerror(errno));
1431 "algorithm: %s", strerror(errno));
1491 pthread_cond_t cond;
1498 if (pthread_cond_init(&cond, NULL)) {
1504 pthread_mutex_lock(&
mutex);
1516 pthread_cond_wait(&cond, &
mutex);
1518 pthread_mutex_unlock(&
mutex);
1539 static char server_uuid[38] =
"";
1541 if (!strlen(server_uuid)) {
1542 uuid_generate_time(uuid);
1543 uuid_unparse(uuid,uuid_str);
1544 memset(server_uuid,0,
sizeof(server_uuid));
1545 strcpy(server_uuid,uuid_str);
1548 strcpy(uuid_str,server_uuid);
#define DEBUG_MSG(LVL, MSG,...)
Print debug message to standard error.
unsigned response_blocks_read
Packet capture support for the Flowgrind daemon.
#define REQUEST_STOP_FLOW
@ INTERVAL
Intermediated interval report.
static void prepare_wfds(struct timespec *now, struct flow *flow, fd_set *wfds)
#define ASSIGN_MAX(s, c)
Assign value if it's greater than current one.
int apply_extra_socket_options(struct flow *flow)
double delay_sum
Accumulated one-way delay.
unsigned current_read_block_size
void * daemon_main(void *ptr __attribute__((unused)))
void time_add(struct timespec *tp, double seconds)
Add an amount of time seconds to a specific point in time tp.
unsigned random_seed
Random seed to use (default: read /dev/urandom) (option -J).
double iat_min
Minimum interarrival time.
size_t fg_list_size(struct linked_list *const list)
Returns the number of elements in the list.
double rtt_max
Maximum round-trip time.
double time_diff(const struct timespec *tp1, const struct timespec *tp2)
Returns the time difference between two the specific points in time tp1 and tp2.
void request_error(struct request *request, const char *fmt,...)
static xmlrpc_value * add_flow_source(xmlrpc_env *const env, xmlrpc_value *const param_array, void *const user_data)
Prepare data connection for source endpoint.
double rtt_sum
Accumulated round-trip time.
int accept_data(struct flow *flow)
#define crit(...)
To report an critical error w/ the corresponding system error message.
struct report * get_reports(int *has_more)
int dscp
DSCP value for TOS byte (option -D).
double delay_max
Maximum one-way delay.
int fg_list_remove(struct linked_list *const list, const void *const data)
Removes from the list the first element whose data points to data.
bool time_is_after(const struct timespec *tp1, const struct timespec *tp2)
Returns true if second point in time tp2 is chronologically after the first point in time tp1.
int set_flow_tcp_options(struct flow *flow)
unsigned request_blocks_written
#define DEFAULT_SELECT_TIMEOUT
Time select() will block waiting for a file descriptor to become ready.
int next_response_block_size(struct flow *flow)
void logging(int priority, const char *fmt,...)
double rtt_sum
Accumulated round-trip time.
int set_non_blocking(int fd)
unsigned long long bytes_read
double iat_sum
Accumulated inter-arrival time.
struct flow_settings::extra_socket_options extra_socket_options[MAX_EXTRA_SOCKET_OPTIONS]
struct fg_tcp_info tcp_info
Error-reporting routines used by Flowgrind.
#define UNUSED_ARGUMENT(x)
Suppress warning for unused argument.
@ DESTINATION
Endpoint that accepts the connection.
double delay_max
Maximum one-way delay.
double duration[2]
Duration of flow in seconds (option -T).
struct flow_source_settings source_settings
#define warnx(...)
To report a warning w/ a system error message.
int cork
Sets SO_DEBUG on test socket (option -O).
Routines used to setup a Flowgrind destination for a test.
double delay_min
Minimum one-way delay.
static void start_flows(struct request_start_flows *request)
static void report_flow(struct flow *flow, int type)
To prepare a report, report type is either INTERVAL or FINAL.
void * data
Pointer to user defined data stored with this node.
struct timespec next_write_block_timestamp
void init_flow(struct flow *flow, int is_source)
To initialize all flows to the default value.
char cc_alg[TCP_CA_NAME_MAX]
Set congestion control algorithm ALG on test socket (option -O).
double delay_min
Minimum one-way delay.
struct flow_settings settings
struct timespec last_report_time
int set_dscp(int fd, int dscp)
#define CPY_INFO_MEMBER(a)
#define ASSIGN_MIN(s, c)
Assign value if it less than current one.
void get_uuid_string(char *uuid_str)
To generate daemon UUID.
unsigned long long bytes_written
void uninit_flow(struct flow *flow)
Routines used by Flowgrind to setup the source for a test flow.
double time_diff_now(const struct timespec *tp)
Returns time difference between now and the specific point in time tp.
double rtt_max
Maximum round-trip time.
int traffic_dump
Dump traffic using libpcap (option -M).
struct report * reports_last
unsigned current_block_bytes_read
int dispatch_request(struct request *request, int type)
Dispatch a request to daemon loop.
int ipmtudiscover
Set IP_MTU_DISCOVER on test socket (option -O).
int set_tcp_nodelay(int fd)
static void process_iat(struct flow *flow)
int maximum_block_size
Application buffer size in bytes (option -U).
int set_so_elcn(int fd, int val)
int flow_control
Stop flow if it is experiencing local congestion (option -C).
unsigned response_blocks_written
int toggle_tcp_cork(int fd)
double iat_max
Maximum inter-arrival time.
const char * ctimespec_r(const struct timespec *tp, char *buf, size_t size, bool ns)
Converts timespec struct tp into a null-terminated string.
unsigned current_write_block_size
struct request * requests_last
struct request * requests
int num_extra_socket_options
static void process_delay(struct flow *flow)
int pushy
Do not iterate through select() to continue sending in case block size did not suffice to fill sendin...
static int flow_sending(struct timespec *now, struct flow *flow, int direction)
void init_math_functions(struct flow *flow, unsigned long seed)
static void stop_flow(struct request_stop_flow *request)
int nonagle
Disable nagle algorithm on test socket (option -O).
Routines used to manipulate socket parameters for Flowgrind.
int set_ip_mtu_discover(int fd)
#define MIN_BLOCK_SIZE
Minium block (message) size we can send.
static int flow_in_delay(struct timespec *now, struct flow *flow, int direction)
static int prepare_rfds(struct timespec *now, struct flow *flow, fd_set *rfds)
unsigned long long bytes_written
static void process_requests()
To process the request issued from the controller.
double reporting_interval
Interval to report flow on screen (option -i).
static int write_data(struct flow *flow)
int set_route_record(int fd)
static int try_read_n_bytes(struct flow *flow, int bytes)
@ SOURCE
Endpoint that opens the connection.
int elcn
Set TCP_ELCN (20) on test socket (option -O).
Common definitions used by the Flowgrind daemon, controller, and libs.
unsigned request_blocks_read
Timing related routines used by Flowgrind.
unsigned pmtu
Discovered Path MTU.
Flowgrind's data block layout.
void flow_error(struct flow *flow, const char *fmt,...)
struct timespec last_block_read
static void send_response(struct flow *flow, int requested_response_block_size)
double delay[2]
Delay of flow in seconds (option -Y).
unsigned congestion_counter
#define REQUEST_GET_STATUS
static void process_select(fd_set *rfds, fd_set *wfds, fd_set *efds)
#define warn(...)
To report a warning w/ the corresponding system error message.
int lcd
Set TCP_LCD (21) on test socket (option -O).
#define free_all(...)
To free() an arbitrary number of variables.
double delay_sum
Accumulated one-way delay.
static int flow_block_scheduled(struct timespec *now, struct flow *flow)
pthread_cond_t * condition
structure for getting the UUID.
Data structures used by the Flowgrind daemon and controller.
unsigned response_blocks_written
struct timespec last_block_written
void free_math_functions(struct flow *flow)
struct timespec data
Sending timestap for calculating delay and RTT.
struct request r
Daemon thread process the request r.
int get_tcp_info(struct flow *flow, struct fg_tcp_info *info)
struct timespec stop_timestamp[2]
unsigned long long bytes_read
struct timespec start_timestamp[2]
struct timespec next_report_time
double next_interpacket_gap(struct flow *flow)
enum report_t type
Report type - either INTERVAL or FINAL report.
unsigned request_blocks_written
#define warnc(code,...)
To report a warning w/ the system error message 'code'.
int shutdown
Shutdown socket after test flow (option -N).
Single element in a doubly linked list.
double iat_max
Maximum interarrival time.
int next_request_block_size(struct flow *flow)
void remove_flow(struct flow *const flow)
static int read_data(struct flow *flow)
double rtt_min
Minimum round-trip time.
struct timespec data2
Used to access 64bit timespec on 32bit arch.
void add_report(struct report *report)
enum endpoint_t endpoint
Daemon endpoint - either source or destination.
unsigned imtu
Interface MTU.
Routines used by the Flowgrind daemon for advanced traffic generation.
struct timespec first_report_time
void add_flow_destination(struct request_add_flow_destination *request)
To set daemon flow as destination endpoint.
struct list_node * next
Pointer to the previous node in the list.
int do_connect(struct flow *flow)
Establishes a connection of a flow.
struct fg_tcp_info tcp_info
#define REQUEST_START_FLOWS
unsigned current_block_bytes_written
static void timer_check()
unsigned request_blocks_read
int mtcp
Set TCP_MTCP (15) on test socket (option -O).
int set_congestion_control(int fd, const char *cc_alg)
double rtt_min
Minimum round-trip time.
double iat_sum
Accumulated interarrival time.
static void process_rtt(struct flow *flow)
Routines for statistics and advanced traffic generation.
#define REQUEST_ADD_SOURCE
#define REQUEST_ADD_DESTINATION
int so_debug
Sets SO_DEBUG on test socket (option -O).
int gettime(struct timespec *tp)
Returns the current wall-clock time with nanosecond precision.
Debugging routines for Flowgrind controller and daemon.
Routines used by the Flowgrind daemon.
double iat_min
Minimum inter-arrival time.
const struct list_node * fg_list_front(struct linked_list *const list)
Returns the first element of the list.
unsigned response_blocks_read
int route_record
Sets ROUTE_RECORD on test socket (option -O).
struct flow::statistics statistics[2]