Flowgrind
Advanced TCP traffic generator
daemon.c
Go to the documentation of this file.
1 
6 /*
7  * Copyright (C) 2010-2013 Christian Samsel <christian.samsel@rwth-aachen.de>
8  * Copyright (C) 2009 Tim Kosse <tim.kosse@gmx.de>
9  * Copyright (C) 2007-2008 Daniel Schaffrath <daniel.schaffrath@mac.com>
10  *
11  * This file is part of Flowgrind.
12  *
13  * Flowgrind is free software: you can redistribute it and/or modify
14  * it under the terms of the GNU General Public License as published by
15  * the Free Software Foundation, either version 3 of the License, or
16  * (at your option) any later version.
17  *
18  * Flowgrind is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21  * GNU General Public License for more details.
22  *
23  * You should have received a copy of the GNU General Public License
24  * along with Flowgrind. If not, see <http://www.gnu.org/licenses/>.
25  *
26  */
27 
28 #ifdef HAVE_CONFIG_H
29 #include "config.h"
30 #endif /* HAVE_CONFIG_H */
31 
32 #include <assert.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <stdarg.h>
36 #include <stdbool.h>
37 #include <strings.h>
38 #include <signal.h>
39 #include <string.h>
40 #include <fcntl.h>
41 #include <math.h>
42 #include <sys/types.h>
43 #include <sys/socket.h>
44 #include <sys/param.h>
45 #include <sys/select.h>
46 #include <netinet/in.h>
47 #include <arpa/inet.h>
48 #include <unistd.h>
49 #include <sys/wait.h>
50 #include <errno.h>
51 #include <time.h>
52 #include <syslog.h>
53 #include <sys/time.h>
54 #include <netdb.h>
55 #include <pthread.h>
56 #include <inttypes.h>
57 #include <float.h>
58 #include <uuid/uuid.h>
59 
60 #include "common.h"
61 #include "debug.h"
62 #include "fg_error.h"
63 #include "fg_math.h"
64 #include "fg_definitions.h"
65 #include "fg_socket.h"
66 #include "fg_time.h"
67 #include "fg_log.h"
68 #include "daemon.h"
69 #include "source.h"
70 #include "destination.h"
71 #include "trafgen.h"
72 
73 #ifdef HAVE_LIBPCAP
74 #include "fg_pcap.h"
75 #endif /* HAVE_LIBPCAP */
76 
77 #ifndef SOL_TCP
78 #define SOL_TCP IPPROTO_TCP
79 #endif /* SOL_TCP */
80 
81 #ifndef SOL_IP
82 #define SOL_IP IPPROTO_IP
83 #endif /* SOL_IP */
84 
85 #define CONGESTION_LIMIT 10000
86 
87 int daemon_pipe[2];
88 
89 pthread_t daemon_thread;
91 char *dump_dir;
92 
93 pthread_mutex_t mutex;
94 struct request *requests = 0, *requests_last = 0;
95 
96 fd_set rfds, wfds, efds;
97 int maxfd;
98 
99 struct report* reports = 0;
100 struct report* reports_last = 0;
101 unsigned pending_reports = 0;
102 
104 
105 char started = 0;
106 
107 /* Forward declarations */
108 static int write_data(struct flow *flow);
109 static int read_data(struct flow *flow);
110 static void process_rtt(struct flow* flow);
111 static void process_iat(struct flow* flow);
112 static void process_delay(struct flow* flow);
113 static void report_flow(struct flow* flow, int type);
114 static void send_response(struct flow* flow,
115  int requested_response_block_size);
116 int get_tcp_info(struct flow *flow, struct fg_tcp_info *info);
117 
118 
119 void flow_error(struct flow *flow, const char *fmt, ...)
120 {
121  char str[1000];
122  va_list ap;
123 
124  va_start(ap, fmt);
125  vsnprintf(str, 1000, fmt, ap);
126  va_end(ap);
127  str[sizeof(str) - 1] = 0;
128  flow->error = malloc(strlen(str) + 1);
129  strcpy(flow->error, str);
130 }
131 
132 void request_error(struct request *request, const char *fmt, ...)
133 {
134  char str[1000];
135  va_list ap;
136 
137  va_start(ap, fmt);
138  vsnprintf(str, 1000, fmt, ap);
139  va_end(ap);
140  str[sizeof(str) - 1] = 0;
141  request->error = malloc(strlen(str) + 1);
142  strcpy(request->error, str);
143 }
144 
145 static inline int flow_in_delay(struct timespec *now, struct flow *flow,
146  int direction)
147 {
148  return time_is_after(&flow->start_timestamp[direction], now);
149 }
150 
151 
152 static inline int flow_sending(struct timespec *now, struct flow *flow,
153  int direction)
154 {
155  return !flow_in_delay(now, flow, direction) &&
156  (flow->settings.duration[direction] < 0 ||
157  time_diff_now(&flow->stop_timestamp[direction]) < 0.0);
158 }
159 
160 static inline int flow_block_scheduled(struct timespec *now, struct flow *flow)
161 {
163 }
164 
165 void uninit_flow(struct flow *flow)
166 {
167  DEBUG_MSG(LOG_DEBUG,"uninit_flow() called for flow %d",flow->id);
168  if (flow->fd != -1)
169  close(flow->fd);
170  if (flow->listenfd_data != -1)
171  close(flow->listenfd_data);
172 #ifdef HAVE_LIBPCAP
173  int rc;
175  rc = pthread_cancel(flow->pcap_thread);
176  if (rc)
177  logging(LOG_WARNING, "failed to cancel dump thread: %s",
178  strerror(rc));
179 
180  /* wait for the dump thread to react to the cancellation request */
181  rc = pthread_join(flow->pcap_thread, NULL);
182  if (rc)
183  logging(LOG_WARNING, "failed to join dump thread: %s",
184  strerror(rc));
185  }
186 #endif /* HAVE_LIBPCAP */
189 }
190 
191 void remove_flow(struct flow * const flow)
192 {
194  free(flow);
195  if (!fg_list_size(&flows))
196  started = 0;
197 }
198 
199 static void prepare_wfds(struct timespec *now, struct flow *flow, fd_set *wfds)
200 {
201  int rc = 0;
202 
203  if (flow_in_delay(now, flow, WRITE)) {
204  DEBUG_MSG(LOG_WARNING, "flow %i not started yet (delayed)",
205  flow->id);
206  return;
207  }
208 
209  if (flow_sending(now, flow, WRITE)) {
210  assert(!flow->finished[WRITE]);
211  if (flow_block_scheduled(now, flow)) {
212  DEBUG_MSG(LOG_DEBUG, "adding sock of flow %d to wfds",
213  flow->id);
214  FD_SET(flow->fd, wfds);
215  } else {
216  DEBUG_MSG(LOG_DEBUG, "no block for flow %d scheduled "
217  "yet", flow->id);
218  }
219  } else if (!flow->finished[WRITE]) {
220  flow->finished[WRITE] = 1;
221  if (flow->settings.shutdown) {
222  DEBUG_MSG(LOG_WARNING, "shutting down flow %d (WR)",
223  flow->id);
224  rc = shutdown(flow->fd,SHUT_WR);
225  if (rc == -1)
226  warn("shutdown() SHUT_WR failed");
227  }
228  }
229 
230  return;
231 }
232 
233 static int prepare_rfds(struct timespec *now, struct flow *flow, fd_set *rfds)
234 {
235  int rc = 0;
236 
237  if (!flow_in_delay(now, flow, READ) && !flow_sending(now, flow, READ)) {
238  if (!flow->finished[READ] && flow->settings.shutdown) {
239  warnx("server flow %u missed to shutdown", flow->id);
240  rc = shutdown(flow->fd, SHUT_RD);
241  if (rc == -1)
242  warn("shutdown SHUT_RD failed");
243  flow->finished[READ] = 1;
244  }
245  }
246 
248  DEBUG_MSG(LOG_ERR, "late connecting test socket for flow %d "
249  "after %.3fs delay",
251  if (do_connect(flow) == -1) {
252  return -1;
253  }
254  }
255 
256  /* Altough the server flow might be finished we keep the socket in
257  * rfd in order to check for buggy servers */
258  if (flow->connect_called && !flow->finished[READ]) {
259  DEBUG_MSG(LOG_DEBUG, "adding sock of flow %d to rfds",
260  flow->id);
261  FD_SET(flow->fd, rfds);
262  }
263 
264  return 0;
265 }
266 
267 static int prepare_fds() {
268 
269  DEBUG_MSG(LOG_DEBUG, "prepare_fds() called, number of flows: %zu",
270  fg_list_size(&flows));
271 
272  FD_ZERO(&rfds);
273  FD_ZERO(&wfds);
274  FD_ZERO(&efds);
275 
276  FD_SET(daemon_pipe[0], &rfds);
277  maxfd = daemon_pipe[0];
278 
279  struct timespec now;
280  gettime(&now);
281 
282  const struct list_node *node = fg_list_front(&flows);
283  while (node) {
284  struct flow *flow = node->data;
285  node = node->next;
286 
287  if (started &&
288  (flow->finished[READ] ||
289  !flow->settings.duration[READ] ||
290  (!flow_in_delay(&now, flow, READ) &&
291  !flow_sending(&now, flow, READ))) &&
292  (flow->finished[WRITE] ||
294  (!flow_in_delay(&now, flow, WRITE) &&
295  !flow_sending(&now, flow, WRITE)))) {
296 
297  /* On Other OSes than Linux or FreeBSD, tcp_info will contain all zeroes */
301  ? 0 : 1;
302 
303  flow->pmtu = get_pmtu(flow->fd);
304 
308  uninit_flow(flow);
309  remove_flow(flow);
310  continue;
311  }
312 
313  if (flow->state == GRIND_WAIT_ACCEPT &&
314  flow->listenfd_data != -1) {
315  FD_SET(flow->listenfd_data, &rfds);
316  maxfd = MAX(maxfd, flow->listenfd_data);
317  }
318 
319  if (!started)
320  continue;
321 
322  if (flow->fd != -1) {
323  FD_SET(flow->fd, &efds);
324  maxfd = MAX(maxfd, flow->fd);
325  prepare_wfds(&now, flow, &wfds);
326  prepare_rfds(&now, flow, &rfds);
327  }
328  }
329 
330  return fg_list_size(&flows);
331 }
332 
334 {
335  struct timespec start;
336  gettime(&start);
337 
338 #if 0
339  if (start.tv_sec < request->start_timestamp) {
340  /* If the clock is synchronized between nodes, all nodes will
341  * start at the same time regardless of any RPC delays */
342  start.tv_sec = request->start_timestamp;
343  start.tv_nsec = 0;
344  }
345 #else /* 0 */
347 #endif /* 0 */
348 
349  const struct list_node *node = fg_list_front(&flows);
350  while (node) {
351  struct flow *flow = node->data;
352  node = node->next;
353  /* initalize random number generator etc */
355 
356  /* READ and WRITE */
357  for (int j = 0; j < 2; j++) {
358  flow->start_timestamp[j] = start;
360  flow->settings.delay[j]);
361  if (flow->settings.duration[j] >= 0) {
362  flow->stop_timestamp[j] =
363  flow->start_timestamp[j];
365  flow->settings.duration[j]);
366  }
367  }
370 
374 
377  }
378 
379  started = 1;
380 }
381 
382 static void stop_flow(struct request_stop_flow *request)
383 {
384  DEBUG_MSG(LOG_DEBUG, "stop_flow forcefully unlocked mutex");
385  pthread_mutex_unlock(&mutex);
386 
387  if (request->flow_id == -1) {
388  /* Stop all flows */
389 
390  const struct list_node *node = fg_list_front(&flows);
391  while (node) {
392  struct flow *flow = node->data;
393  node = node->next;
394 
398  ? 0 : 1;
399  flow->pmtu = get_pmtu(flow->fd);
400 
404 
405  uninit_flow(flow);
406  remove_flow(flow);
407  }
408 
409  return;
410  }
411 
412  const struct list_node *node = fg_list_front(&flows);
413  while (node) {
414  struct flow *flow = node->data;
415  node = node->next;
416 
417  if (flow->id != request->flow_id)
418  continue;
419 
420  /* On Other OSes than Linux or FreeBSD, tcp_info will contain all zeroes */
424  ? 0 : 1;
425  flow->pmtu = get_pmtu(flow->fd);
426 
430 
431  uninit_flow(flow);
432  remove_flow(flow);
433  return;
434  }
435 
436  request_error(&request->r, "Unknown flow id");
437 }
438 
446 static void process_requests()
447 {
448  int rc;
449  DEBUG_MSG(LOG_DEBUG, "process_requests trying to lock mutex");
450  pthread_mutex_lock(&mutex);
451  DEBUG_MSG(LOG_DEBUG, "process_requests locked mutex");
452 
453  char tmp[100];
454  for (;;) {
455  int rc = read(daemon_pipe[0], tmp, 100);
456  if (rc != 100)
457  break;
458  }
459 
460  while (requests) {
461  struct request* request = requests;
463  rc = 0;
464 
465  switch (request->type) {
467  add_flow_destination((struct
469  *)request);
470  break;
471  case REQUEST_ADD_SOURCE:
472  rc = add_flow_source((struct
474  *)request);
475  break;
476  case REQUEST_START_FLOWS:
478  break;
479  case REQUEST_STOP_FLOW:
481  break;
482  case REQUEST_GET_STATUS:
483  {
484  struct request_get_status *r =
485  (struct request_get_status *)request;
486  r->started = started;
487  r->num_flows = fg_list_size(&flows);
488  }
489  break;
490  case REQUEST_GET_UUID:
491  {
492  struct request_get_uuid *r =
493  (struct request_get_uuid *)request;
494  get_uuid_string(r->server_uuid);
495  }
496  break;
497  default:
498  request_error(request, "Unknown request type");
499  break;
500  }
501  if (rc != 1)
502  pthread_cond_signal(request->condition);
503  }
504 
505  pthread_mutex_unlock(&mutex);
506  DEBUG_MSG(LOG_DEBUG, "process_requests unlocked mutex");
507 }
508 
522 static void report_flow(struct flow* flow, int type)
523 {
524  DEBUG_MSG(LOG_DEBUG, "report_flow called for flow %d (type %d)",
525  flow->id, type);
526  struct report* report =
527  (struct report*)malloc(sizeof(struct report));
528 
529  report->id = flow->id;
531  report->type = type;
532 
533  if (type == INTERVAL)
535  else
537 
538  gettime(&report->end);
540 
541  /* abort if we were scheduled way to early for a interval report */
542  if (time_diff(&report->begin,&report->end) < 0.2 *
544  free(report);
545  return;
546  }
547 
558 
568 
569  /* Currently this will only contain useful information on Linux
570  * and FreeBSD */
572 
573  if (flow->fd != -1) {
574  /* Get latest MTU */
575  flow->pmtu = get_pmtu(flow->fd);
576  report->pmtu = flow->pmtu;
577  if (type == FINAL)
578  report->imtu = get_imtu(flow->fd);
579  else
580  report->imtu = 0;
581  } else {
582  report->imtu = 0;
583  report->pmtu = 0;
584  }
585  /* Add status flags to report */
586  report->status = 0;
587 
588  if (flow->statistics[type].bytes_read == 0) {
589  if (flow_in_delay(&report->end, flow, READ))
590  report->status |= 'd';
591  else if (flow_sending(&report->end, flow, READ))
592  report->status |= 'l';
593  else if (flow->settings.duration[READ] == 0)
594  report->status |= 'o';
595  else
596  report->status |= 'f';
597  } else {
598  if (!flow_sending(&report->end, flow, READ) && !flow->finished)
599  report->status |= 'c';
600  else
601  report->status |= 'n';
602  }
603  report->status <<= 8;
604 
605  if (flow->statistics[type].bytes_written == 0) {
606  if (flow_in_delay(&report->end, flow, WRITE))
607  report->status |= 'd';
608  else if (flow_sending(&report->end, flow, WRITE))
609  report->status |= 'l';
610  else if (flow->settings.duration[WRITE] == 0)
611  report->status |= 'o';
612  else
613  report->status |= 'f';
614  } else {
615  if (!flow_sending(&report->end, flow, WRITE) && !flow->finished)
616  report->status |= 'c';
617  else
618  report->status |= 'n';
619  }
620 
621  /* New report interval, reset old data */
622  if (type == INTERVAL) {
625 
628 
631 
632  flow->statistics[INTERVAL].rtt_min = FLT_MAX;
633  flow->statistics[INTERVAL].rtt_max = FLT_MIN;
634  flow->statistics[INTERVAL].rtt_sum = 0.0F;
635  flow->statistics[INTERVAL].iat_min = FLT_MAX;
636  flow->statistics[INTERVAL].iat_max = FLT_MIN;
637  flow->statistics[INTERVAL].iat_sum = 0.0F;
638  flow->statistics[INTERVAL].delay_min = FLT_MAX;
639  flow->statistics[INTERVAL].delay_max = FLT_MIN;
641  }
642 
644  DEBUG_MSG(LOG_DEBUG, "report_flow finished for flow %d (type %d)",
645  flow->id, type);
646 }
647 
648 /* Fills the given _fg_tcp_info with the values of the OS specific tcp_info,
649  * returns 0 on success */
650 int get_tcp_info(struct flow *flow, struct fg_tcp_info *info)
651 {
652 #ifdef HAVE_TCP_INFO
653  struct tcp_info tmp_info;
654  socklen_t info_len = sizeof(tmp_info);
655  int rc;
656  memset(info, 0, sizeof(struct fg_tcp_info));
657 
658  rc = getsockopt(flow->fd, IPPROTO_TCP, TCP_INFO, &tmp_info, &info_len);
659  if (rc == -1) {
660  warn("getsockopt() failed");
661  return -1;
662  }
663  #define CPY_INFO_MEMBER(a) info->a = (int) tmp_info.a;
664  CPY_INFO_MEMBER(tcpi_snd_cwnd);
665  CPY_INFO_MEMBER(tcpi_snd_ssthresh);
666  CPY_INFO_MEMBER(tcpi_rtt);
667  CPY_INFO_MEMBER(tcpi_rttvar);
668  CPY_INFO_MEMBER(tcpi_rto);
669  CPY_INFO_MEMBER(tcpi_snd_mss);
670 
671  /* TODO FreeBSD 9.1 doesn't fill these members, but maybe FreeBSD 10.0
672  * will fill it, so get rid of this ifdef */
673 #ifdef __LINUX__
674  CPY_INFO_MEMBER(tcpi_backoff);
675  CPY_INFO_MEMBER(tcpi_unacked);
676  CPY_INFO_MEMBER(tcpi_sacked);
677  CPY_INFO_MEMBER(tcpi_lost);
678  CPY_INFO_MEMBER(tcpi_retrans);
679  CPY_INFO_MEMBER(tcpi_retransmits);
680  CPY_INFO_MEMBER(tcpi_fackets);
681  CPY_INFO_MEMBER(tcpi_reordering);
682  CPY_INFO_MEMBER(tcpi_ca_state);
683 #endif /* __LINUX__ */
684 #else /* HAVE_TCP_INFO */
686  memset(info, 0, sizeof(struct fg_tcp_info));
687 #endif /* HAVE_TCP_INFO */
688  return 0;
689 }
690 
691 static void timer_check()
692 {
693  struct timespec now;
694 
695  if (!started)
696  return;
697 
698  gettime(&now);
699  const struct list_node *node = fg_list_front(&flows);
700  while (node) {
701  struct flow *flow = node->data;
702  node = node->next;
703 
704  DEBUG_MSG(LOG_DEBUG, "processing timer_check() for flow %d",
705  flow->id);
706 
708  continue;
709 
710  if (!time_is_after(&now, &flow->next_report_time))
711  continue;
712 
713  /* On Other OSes than Linux or FreeBSD, tcp_info will contain all zeroes */
714  if (flow->fd != -1)
718  ? 0 : 1;
720 
721  do {
724  } while (time_is_after(&now, &flow->next_report_time));
725  }
726  DEBUG_MSG(LOG_DEBUG, "finished timer_check()");
727 }
728 
729 static void process_select(fd_set *rfds, fd_set *wfds, fd_set *efds)
730 {
731  const struct list_node *node = fg_list_front(&flows);
732  while (node) {
733  struct flow *flow = node->data;
734  node = node->next;
735 
736  DEBUG_MSG(LOG_DEBUG, "processing pselect() for flow %d",
737  flow->id);
738 
739  if (flow->listenfd_data != -1 &&
740  FD_ISSET(flow->listenfd_data, rfds)) {
741  DEBUG_MSG(LOG_DEBUG, "ready for accept");
742  if (flow->state == GRIND_WAIT_ACCEPT) {
743  if (accept_data(flow) == -1) {
744  DEBUG_MSG(LOG_ERR, "accept_data() "
745  "failed");
746  goto remove;
747  }
748  }
749  }
750 
751  if (flow->fd != -1) {
752  if (FD_ISSET(flow->fd, efds)) {
753  int error_number, rc;
754  socklen_t error_number_size =
755  sizeof(error_number);
756  DEBUG_MSG(LOG_DEBUG, "sock of flow %d in efds",
757  flow->id);
758  rc = getsockopt(flow->fd, SOL_SOCKET,
759  SO_ERROR,
760  (void *)&error_number,
761  &error_number_size);
762  if (rc == -1) {
763  warn("failed to get errno for"
764  "non-blocking connect");
765  goto remove;
766  }
767  if (error_number != 0) {
768  warnc(error_number, "connect");
769  goto remove;
770  }
771  }
772  if (FD_ISSET(flow->fd, wfds))
773  if (write_data(flow) == -1) {
774  DEBUG_MSG(LOG_ERR, "write_data() failed");
775  goto remove;
776  }
777 
778  if (FD_ISSET(flow->fd, rfds))
779  if (read_data(flow) == -1) {
780  DEBUG_MSG(LOG_ERR, "read_data() failed");
781  goto remove;
782  }
783  }
784  continue;
785 remove:
786  if (flow->fd != -1) {
790  ? 0 : 1;
791  }
792  flow->pmtu = get_pmtu(flow->fd);
794  uninit_flow(flow);
795  DEBUG_MSG(LOG_ERR, "removing flow %d", flow->id);
796  remove_flow(flow);
797  }
798 }
799 
800 void* daemon_main(void* ptr __attribute__((unused)))
801 {
802  struct timespec timeout;
803  for (;;) {
804  int need_timeout = prepare_fds();
805 
806  timeout.tv_sec = 0;
807  timeout.tv_nsec = DEFAULT_SELECT_TIMEOUT;
808  DEBUG_MSG(LOG_DEBUG, "calling pselect() need_timeout: %i",
809  need_timeout);
810  int rc = pselect(maxfd + 1, &rfds, &wfds, &efds,
811  need_timeout ? &timeout : 0, NULL);
812  if (rc < 0) {
813  if (errno == EINTR)
814  continue;
815  crit("pselect() failed");
816  }
817  DEBUG_MSG(LOG_DEBUG, "pselect() finished");
818 
819  if (FD_ISSET(daemon_pipe[0], &rfds))
821 
822  timer_check();
824  }
825 }
826 
827 void add_report(struct report* report)
828 {
829  DEBUG_MSG(LOG_DEBUG, "add_report trying to lock mutex");
830  pthread_mutex_lock(&mutex);
831  DEBUG_MSG(LOG_DEBUG, "add_report aquired mutex");
832  /* Do not keep too much data */
833  if (pending_reports >= 250 && report->type != FINAL) {
834  free(report);
835  pthread_mutex_unlock(&mutex);
836  return;
837  }
838 
839  report->next = 0;
840 
841  if (reports_last)
843  else
844  reports = report;
845 
847  pending_reports++;
848 
849  pthread_mutex_unlock(&mutex);
850  DEBUG_MSG(LOG_DEBUG, "add_report unlocked mutex");
851 }
852 
853 struct report* get_reports(int *has_more)
854 {
855  const unsigned max_reports = 50;
856 
857  struct report* ret;
858  DEBUG_MSG(LOG_DEBUG, "get_reports trying to lock mutex");
859  pthread_mutex_lock(&mutex);
860  DEBUG_MSG(LOG_DEBUG, "get_reports aquired mutex");
861  ret = reports;
862 
863  if (pending_reports <= max_reports) {
864  *has_more = 0;
865  pending_reports = 0;
866  reports = NULL;
867  reports_last = 0;
868  } else {
869  /* Split off first 50 items */
870  struct report* tmp;
871  for (unsigned i = 0; i < max_reports - 1; i++)
872  reports = reports->next;
873  tmp = reports->next;
874  reports->next = 0;
875  reports = tmp;
876 
877  pending_reports -= max_reports;
878  *has_more = 1;
879  }
880 
881  pthread_mutex_unlock(&mutex);
882  DEBUG_MSG(LOG_DEBUG, "get_reports unlocked mutex");
883  return ret;
884 }
885 
896 void init_flow(struct flow* flow, int is_source)
897 {
898  memset(flow, 0, sizeof(struct flow));
899 
900  /* flow id is given by controller */
901  flow->id = -1;
902  flow->endpoint = is_source ? SOURCE : DESTINATION;
904  flow->fd = -1;
905  flow->listenfd_data = -1;
906 
909 
910  flow->finished[READ] = flow->finished[WRITE] = 0;
911 
912  flow->addr = 0;
913 
914  foreach(int *i, INTERVAL, FINAL) {
915  flow->statistics[*i].bytes_read = 0;
916  flow->statistics[*i].bytes_written = 0;
917 
922 
923  flow->statistics[*i].rtt_min = FLT_MAX;
924  flow->statistics[*i].rtt_max = FLT_MIN;
925  flow->statistics[*i].rtt_sum = 0.0F;
926  flow->statistics[*i].iat_min = FLT_MAX;
927  flow->statistics[*i].iat_max = FLT_MIN;
928  flow->statistics[*i].iat_sum = 0.0F;
929  flow->statistics[*i].delay_min = FLT_MAX;
930  flow->statistics[*i].delay_max = FLT_MIN;
931  flow->statistics[*i].delay_sum = 0.0F;
932  }
933 
934  DEBUG_MSG(LOG_NOTICE, "called init flow %d", flow->id);
935 }
936 
937 static int write_data(struct flow *flow)
938 {
939  int rc = 0;
940  int response_block_size = 0;
941  double interpacket_gap = .0;
942  for (;;) {
943 
944  /* fill buffer with new data */
945  if (flow->current_block_bytes_written == 0) {
948  response_block_size = next_response_block_size(flow);
949  /* serialize data:
950  * this_block_size */
951  ((struct block *)flow->write_block)->this_block_size =
953  /* requested_block_size */
954  ((struct block *)flow->write_block)->request_block_size =
955  htonl(response_block_size);
956  /* write rtt data (will be echoed back by the receiver
957  * in the response packet) */
958  gettime((struct timespec *)
959  (flow->write_block + 2 * (sizeof (int32_t))));
960 
961  DEBUG_MSG(LOG_DEBUG, "wrote new request data to out "
962  "buffer bs = %d, rqs = %d, on flow %d",
963  ntohl(((struct block *)flow->write_block)->this_block_size),
964  ntohl(((struct block *)flow->write_block)->request_block_size),
965  flow->id);
966  }
967 
968  rc = write(flow->fd,
969  flow->write_block +
973 
974  if (rc == -1) {
975  if (errno == EAGAIN) {
976  logging(LOG_WARNING, "write queue limit hit for "
977  "flow %d", flow->id);
978  break;
979  }
980  DEBUG_MSG(LOG_WARNING, "write() returned %d on flow %d, "
981  "fd %d: %s", rc, flow->id, flow->fd,
982  strerror(errno));
983  flow_error(flow, "premature end of test: %s",
984  strerror(errno));
985  return rc;
986  }
987 
988  if (rc == 0) {
989  DEBUG_MSG(LOG_CRIT, "flow %d sent zero bytes. what "
990  "does that mean?", flow->id);
991  return rc;
992  }
993 
994  DEBUG_MSG(LOG_DEBUG, "flow %d sent %d request bytes of %u "
995  "(before = %u)", flow->id, rc,
998 
999  foreach(int *i, INTERVAL, FINAL)
1000  flow->statistics[*i].bytes_written += rc;
1001 
1003 
1008  /* we just finished writing a block */
1011 
1012  foreach(int *i, INTERVAL, FINAL)
1014 
1015  interpacket_gap = next_interpacket_gap(flow);
1016 
1017  /* if we calculated a non-zero packet add relative time
1018  * to the next write stamp which is then checked in the
1019  * select call */
1020  if (interpacket_gap) {
1022  interpacket_gap);
1025  char timestamp[30] = "";
1027  timestamp, sizeof(timestamp), true);
1028  DEBUG_MSG(LOG_WARNING, "incipient "
1029  "congestion on flow %u new "
1030  "block scheduled for %s, "
1031  "%.6lfs before now",
1032  flow->id, timestamp,
1036  if (flow->congestion_counter >
1037  CONGESTION_LIMIT &&
1039  return -1;
1040  }
1041  }
1042  if (flow->settings.cork && toggle_tcp_cork(flow->fd) == -1)
1043  DEBUG_MSG(LOG_NOTICE, "failed to recork test "
1044  "socket for flow %d: %s",
1045  flow->id, strerror(errno));
1046  }
1047 
1048  if (!flow->settings.pushy)
1049  break;
1050  }
1051  return 0;
1052 }
1053 
1054 static inline int try_read_n_bytes(struct flow *flow, int bytes)
1055 {
1056  int rc;
1057  struct iovec iov;
1058  struct msghdr msg;
1059 /* we only read out of band data for debugging purpose */
1060 #ifdef DEBUG
1061  char cbuf[512];
1062  struct cmsghdr *cmsg;
1063 #else /* DEBUG */
1064  char cbuf[16];
1065 #endif /* DEBUG */
1066  iov.iov_base = flow->read_block +
1068  iov.iov_len = bytes;
1069  /* no name required */
1070  msg.msg_name = NULL;
1071  msg.msg_namelen = 0;
1072  msg.msg_iov = &iov;
1073  msg.msg_iovlen = 1;
1074  msg.msg_control = cbuf;
1075  msg.msg_controllen = sizeof(cbuf);
1076 
1077  rc = recvmsg(flow->fd, &msg, 0);
1078 
1079  DEBUG_MSG(LOG_DEBUG, "tried reading %d bytes, got %d", bytes, rc);
1080 
1081  if (rc == -1) {
1082  if (errno == EAGAIN)
1083  flow_error(flow, "Premature end of test: %s",
1084  strerror(errno));
1085  return -1;
1086  }
1087 
1088  if (rc == 0) {
1089  DEBUG_MSG(LOG_ERR, "server shut down test socket of flow %d",
1090  flow->id);
1091  if (!flow->finished[READ] || !flow->settings.shutdown)
1092  warnx("premature shutdown of server flow");
1093  flow->finished[READ] = 1;
1094  return -1;
1095  }
1096 
1097  DEBUG_MSG(LOG_DEBUG, "flow %d received %u bytes", flow->id, rc);
1098 
1100 
1101  foreach(int *i, INTERVAL, FINAL)
1102  flow->statistics[*i].bytes_read += rc;
1103 
1104 #ifdef DEBUG
1105  for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg))
1106  DEBUG_MSG(LOG_NOTICE, "flow %d received cmsg: type = %u, len = %u",
1107  flow->id, cmsg->cmsg_type, (socklen_t) cmsg->cmsg_len);
1108 #endif /* DEBUG */
1109 
1110  return rc;
1111 }
1112 
1113 static int read_data(struct flow *flow)
1114 {
1115  int rc = 0;
1116  int optint = 0;
1117  int requested_response_block_size = 0;
1118 
1119  for (;;) {
1120  /* make sure to read block header for new block */
1122  rc = try_read_n_bytes(flow,
1125  break;
1126  }
1127  /* parse data and update status */
1128 
1129  /* parse and check current block size for validity */
1130  optint = ntohl( ((struct block *)flow->read_block)->this_block_size );
1131  if (optint >= MIN_BLOCK_SIZE &&
1132  optint <= flow->settings.maximum_block_size )
1133  flow->current_read_block_size = optint;
1134  else
1135  logging(LOG_WARNING, "flow %d parsed illegal cbs %d, "
1136  "ignoring (max: %d)", flow->id, optint,
1138 
1139  /* parse and check current request size for validity */
1140  optint = ntohl( ((struct block *)flow->read_block)->request_block_size );
1141  if (optint == -1 || optint == 0 ||
1142  (optint >= MIN_BLOCK_SIZE &&
1143  optint <= flow->settings.maximum_block_size))
1144  requested_response_block_size = optint;
1145  else
1146  logging(LOG_WARNING, "flow %d parsed illegal qbs %d, "
1147  "ignoring (max: %d)", flow->id, optint,
1149 #ifdef DEBUG
1150  if (requested_response_block_size == -1) {
1151  DEBUG_MSG(LOG_NOTICE, "processing response block on "
1152  "flow %d size: %d", flow->id,
1154  } else {
1155  DEBUG_MSG(LOG_NOTICE, "processing request block on "
1156  "flow %d size: %d, request: %d", flow->id,
1158  requested_response_block_size);
1159  }
1160 #endif /* DEBUG */
1161  /* read rest of block, if we have more to read */
1164  rc += try_read_n_bytes(flow,
1167 
1170  assert(flow->current_block_bytes_read ==
1173 
1174  /* TODO process_rtt(), process_iat(), and
1175  * process_delay () call all gettime().
1176  * Quite inefficient... */
1177 
1178  if (requested_response_block_size == -1) {
1179  /* this is a response block, consider DATA as
1180  * RTT */
1181  foreach(int *i, INTERVAL, FINAL)
1183  process_rtt(flow);
1184  } else {
1185  /* this is a request block, calculate IAT */
1186  foreach(int *i, INTERVAL, FINAL)
1188  process_iat(flow);
1190 
1191  /* send response if requested */
1192  if (requested_response_block_size >=
1193  (signed)MIN_BLOCK_SIZE && !flow->finished[READ])
1195  requested_response_block_size);
1196  }
1197  }
1198  if (!flow->settings.pushy)
1199  break;
1200  }
1201  return rc;
1202 }
1203 
1204 static void process_rtt(struct flow* flow)
1205 {
1206  double current_rtt = .0;
1207  struct timespec now;
1208  struct timespec *data = (struct timespec *)
1209  (flow->read_block + 2*(sizeof (int32_t)));
1210 
1211  gettime(&now);
1212  current_rtt = time_diff(data, &now);
1213 
1214  if (current_rtt < 0) {
1215  logging(LOG_CRIT, "received malformed rtt block of flow %d "
1216  "(rtt = %.3lfms), ignoring", flow->id, current_rtt * 1e3);
1217  current_rtt = NAN;
1218  }
1219 
1220  flow->last_block_read = now;
1221 
1222  if (!isnan(current_rtt)) {
1223  foreach(int *i, INTERVAL, FINAL) {
1224  ASSIGN_MIN(flow->statistics[*i].rtt_min, current_rtt);
1225  ASSIGN_MAX(flow->statistics[*i].rtt_max, current_rtt);
1226  flow->statistics[*i].rtt_sum += current_rtt;
1227  }
1228  }
1229 
1230  DEBUG_MSG(LOG_NOTICE, "processed RTT of flow %d (%.3lfms)",
1231  flow->id, current_rtt * 1e3);
1232 }
1233 
1234 static void process_iat(struct flow* flow)
1235 {
1236  double current_iat = .0;
1237  struct timespec now;
1238 
1239  gettime(&now);
1240 
1241  if (flow->last_block_read.tv_sec ||
1242  flow->last_block_read.tv_nsec)
1243  current_iat = time_diff(&flow->last_block_read, &now);
1244  else
1245  current_iat = NAN;
1246 
1247  if (current_iat < 0) {
1248  logging(LOG_CRIT, "calculated malformed iat of flow %d "
1249  "(iat = %.3lfms) (clock skew?), ignoring",
1250  flow->id, current_iat * 1e3);
1251  current_iat = NAN;
1252  }
1253 
1254  flow->last_block_read = now;
1255 
1256  if (!isnan(current_iat)) {
1257  foreach(int *i, INTERVAL, FINAL) {
1258  ASSIGN_MIN(flow->statistics[*i].iat_min, current_iat);
1259  ASSIGN_MAX(flow->statistics[*i].iat_max, current_iat);
1260  flow->statistics[*i].iat_sum += current_iat;
1261  }
1262  }
1263  DEBUG_MSG(LOG_NOTICE, "processed IAT of flow %d (%.3lfms)",
1264  flow->id, current_iat * 1e3);
1265 }
1266 
1267 static void process_delay(struct flow* flow)
1268 {
1269  double current_delay = .0;
1270  struct timespec now;
1271  struct timespec *data = (struct timespec *)
1272  (flow->read_block + 2*(sizeof (int32_t)));
1273 
1274  gettime(&now);
1275  current_delay = time_diff(data, &now);
1276 
1277  if (current_delay < 0) {
1278  logging(LOG_NOTICE, "calculated malformed delay of flow "
1279  "%d (rtt = %.3lfms) (clocks out-of-sync?), ignoring",
1280  flow->id, current_delay * 1e3);
1281  current_delay = NAN;
1282  }
1283 
1284  if (!isnan(current_delay)) {
1285  foreach(int *i, INTERVAL, FINAL) {
1287  current_delay);
1289  current_delay);
1290  flow->statistics[*i].delay_sum += current_delay;
1291  }
1292  }
1293 
1294  DEBUG_MSG(LOG_NOTICE, "processed delay of flow %d (%.3lfms)",
1295  flow->id, current_delay * 1e3);
1296 }
1297 
1298 static void send_response(struct flow* flow, int requested_response_block_size)
1299 {
1300  int rc;
1301  int try = 0;
1302 
1304 
1305  /* write requested block size as current size */
1306  ((struct block *)flow->write_block)->this_block_size =
1307  htonl(requested_response_block_size);
1308  /* rqs = -1 indicates response block */
1309  ((struct block *)flow->write_block)->request_block_size = htonl(-1);
1310  /* copy rtt data from received block to response block (echo back) */
1311  ((struct block *)flow->write_block)->data =
1312  ((struct block *)flow->read_block)->data;
1313  /* workaround for 64bit sender and 32bit receiver: we check if the
1314  * timespec is 64bit and then echo the missing 32bit back, too */
1315  if ((((struct block *)flow->write_block)->data.tv_sec) ||
1316  ((struct block *)flow->write_block)->data.tv_nsec)
1317  ((struct block *)flow->write_block)->data2 =
1318  ((struct block *)flow->read_block)->data2;
1319 
1320  DEBUG_MSG(LOG_DEBUG, "wrote new response data to out buffer bs = %d, "
1321  "rqs = %d on flow %d",
1322  ntohl(((struct block *)flow->write_block)->this_block_size),
1323  ntohl(((struct block *)flow->write_block)->request_block_size),
1324  flow->id);
1325 
1326  /* send data out until block is finished (or abort if 0 zero bytes are
1327  * send CONGESTION_LIMIT times) */
1328  for (;;) {
1329  rc = write(flow->fd,
1331  requested_response_block_size -
1333 
1334  DEBUG_MSG(LOG_NOTICE, "send %d bytes response (rqs %d) on flow "
1335  "%d", rc, requested_response_block_size,flow->id);
1336 
1337  if (rc == -1) {
1338  if (errno == EAGAIN) {
1339  DEBUG_MSG(LOG_DEBUG, "%s, still trying to send "
1340  "response block (write queue hit "
1341  "limit)", strerror(errno));
1342  try++;
1343  if (try >= CONGESTION_LIMIT &&
1345  logging(LOG_WARNING, "tried to send "
1346  "response block %d times without "
1347  "success, dropping (%s)",
1348  try, strerror(errno));
1349  break;
1350  }
1351  } else {
1352  logging(LOG_WARNING, "premature end of test: "
1353  "%s, abort flow", strerror(errno));
1354  flow->finished[READ] = 1;
1355  break;
1356  }
1357  } else {
1359  foreach(int *i, INTERVAL, FINAL)
1360  flow->statistics[*i].bytes_written += rc;
1361 
1363  (unsigned)requested_response_block_size) {
1365  (unsigned)requested_response_block_size);
1366  /* just finish sending response block */
1369  foreach(int *i, INTERVAL, FINAL)
1371  break;
1372  }
1373  }
1374  }
1375 }
1376 
1377 
1379 {
1380  for (int i = 0; i < flow->settings.num_extra_socket_options; i++) {
1381  int level, res;
1382  const struct extra_socket_options *option =
1384 
1385  switch (option->level) {
1386  case level_sol_socket:
1387  level = SOL_SOCKET;
1388  break;
1389  case level_sol_tcp:
1390  level = SOL_TCP;
1391  break;
1392  case level_ipproto_ip:
1393  level = IPPROTO_IP;
1394  break;
1395  case level_ipproto_sctp:
1396  level = IPPROTO_SCTP;
1397  break;
1398  case level_ipproto_tcp:
1399  level = IPPROTO_TCP;
1400  break;
1401  case level_ipproto_udp:
1402  level = IPPROTO_UDP;
1403  break;
1404  default:
1405  flow_error(flow, "Unknown socket option level: %d",
1406  option->level);
1407  return -1;
1408  }
1409 
1410  res = setsockopt(flow->fd, level, option->optname,
1411  option->optval, option->optlen);
1412 
1413  if (res == -1) {
1414  flow_error(flow, "Unable to set socket option %d: %s",
1415  option->optname, strerror(errno));
1416  return -1;
1417  }
1418  }
1419 
1420  return 0;
1421 }
1422 
1423 /* Set the TCP options on the data socket */
1425 {
1427 
1428  if (*flow->settings.cc_alg &&
1430  flow_error(flow, "Unable to set congestion control "
1431  "algorithm: %s", strerror(errno));
1432  return -1;
1433  }
1434  if (flow->settings.elcn &&
1435  set_so_elcn(flow->fd, flow->settings.elcn) == -1) {
1436  flow_error(flow, "Unable to set TCP_ELCN: %s",
1437  strerror(errno));
1438  return -1;
1439  }
1440  if (flow->settings.lcd && set_so_lcd(flow->fd) == -1) {
1441  flow_error(flow, "Unable to set TCP_LCD: %s",
1442  strerror(errno));
1443  return -1;
1444  }
1445  if (flow->settings.cork && set_tcp_cork(flow->fd) == -1) {
1446  flow_error(flow, "Unable to set TCP_CORK: %s",
1447  strerror(errno));
1448  return -1;
1449  }
1450  if (flow->settings.so_debug && set_so_debug(flow->fd) == -1) {
1451  flow_error(flow, "Unable to set SO_DEBUG: %s",
1452  strerror(errno));
1453  return -1;
1454  }
1455  if (flow->settings.mtcp && set_tcp_mtcp(flow->fd) == -1) {
1456  flow_error(flow, "Unable to set TCP_MTCP: %s",
1457  strerror(errno));
1458  return -1;
1459  }
1460  if (flow->settings.nonagle && set_tcp_nodelay(flow->fd) == -1) {
1461  flow_error(flow, "Unable to set TCP_NODELAY: %s",
1462  strerror(errno));
1463  return -1;
1464  }
1465  if (flow->settings.route_record && set_route_record(flow->fd) == -1) {
1466  flow_error(flow, "Unable to set route record option: %s",
1467  strerror(errno));
1468  return -1;
1469  }
1470  if (flow->settings.dscp &&
1471  set_dscp(flow->fd, flow->settings.dscp) == -1) {
1472  flow_error(flow, "Unable to set DSCP value: %s",
1473  strerror(errno));
1474  return -1;
1475  }
1476  if (flow->settings.ipmtudiscover &&
1477  set_ip_mtu_discover(flow->fd) == -1) {
1478  flow_error(flow, "Unable to set IP_MTU_DISCOVER value: %s",
1479  strerror(errno));
1480  return -1;
1481  }
1482  if (apply_extra_socket_options(flow) == -1)
1483  return -1;
1484 
1485  return 0;
1486 }
1487 
1488 /* Dispatch an incoming request to daemon thread */
1489 int dispatch_request(struct request *request, int type)
1490 {
1491  pthread_cond_t cond;
1492 
1493  request->error = NULL;
1494  request->type = type;
1495  request->next = NULL;
1496 
1497  /* Create synchronization mutex */
1498  if (pthread_cond_init(&cond, NULL)) {
1499  request_error(request, "Could not create synchronization mutex");
1500  return -1;
1501  }
1502  request->condition = &cond;
1503 
1504  pthread_mutex_lock(&mutex);
1505 
1506  if (!requests) {
1507  requests = request;
1509  } else {
1512  }
1513  if (write(daemon_pipe[1], &type, 1) != 1) /* Doesn't matter what we write */
1514  return -1;
1515  /* Wait until the daemon thread has processed the request */
1516  pthread_cond_wait(&cond, &mutex);
1517 
1518  pthread_mutex_unlock(&mutex);
1519 
1520  if (request->error)
1521  return -1;
1522 
1523  return 0;
1524 }
1525 
1536 void get_uuid_string(char *uuid_str)
1537 {
1538  uuid_t uuid;
1539  static char server_uuid[38] = "";
1540 
1541  if (!strlen(server_uuid)) {
1542  uuid_generate_time(uuid);
1543  uuid_unparse(uuid,uuid_str);
1544  memset(server_uuid,0,sizeof(server_uuid));
1545  strcpy(server_uuid,uuid_str);
1546  return;
1547  }
1548  strcpy(uuid_str,server_uuid);
1549 }
DEBUG_MSG
#define DEBUG_MSG(LVL, MSG,...)
Print debug message to standard error.
Definition: debug.h:49
flow::statistics::response_blocks_read
unsigned response_blocks_read
Definition: daemon.h:132
FINAL
@ FINAL
Final report.
Definition: common.h:116
fg_pcap.h
Packet capture support for the Flowgrind daemon.
REQUEST_STOP_FLOW
#define REQUEST_STOP_FLOW
Definition: daemon.h:176
level_sol_socket
@ level_sol_socket
Definition: common.h:121
INTERVAL
@ INTERVAL
Intermediated interval report.
Definition: common.h:114
report::id
int id
Definition: common.h:287
prepare_wfds
static void prepare_wfds(struct timespec *now, struct flow *flow, fd_set *wfds)
Definition: daemon.c:199
ASSIGN_MAX
#define ASSIGN_MAX(s, c)
Assign value if it's greater than current one.
Definition: fg_definitions.h:68
apply_extra_socket_options
int apply_extra_socket_options(struct flow *flow)
Definition: daemon.c:1378
WRITE
@ WRITE
Write operation.
Definition: common.h:106
rfds
fd_set rfds
Definition: daemon.c:96
flow::statistics::delay_sum
double delay_sum
Accumulated one-way delay.
Definition: daemon.h:148
flow::current_read_block_size
unsigned current_read_block_size
Definition: daemon.h:101
daemon_main
void * daemon_main(void *ptr __attribute__((unused)))
Definition: daemon.c:800
time_add
void time_add(struct timespec *tp, double seconds)
Add an amount of time seconds to a specific point in time tp.
Definition: fg_time.c:136
request_get_status
Definition: daemon.h:247
fg_log.h
flow_settings::random_seed
unsigned random_seed
Random seed to use (default: read /dev/urandom) (option -J).
Definition: common.h:223
flow::statistics::iat_min
double iat_min
Minimum interarrival time.
Definition: daemon.h:138
fg_list_size
size_t fg_list_size(struct linked_list *const list)
Returns the number of elements in the list.
Definition: fg_list.c:211
report::rtt_max
double rtt_max
Maximum round-trip time.
Definition: common.h:323
flow::connect_called
char connect_called
Definition: daemon.h:111
REQUEST_GET_UUID
#define REQUEST_GET_UUID
Definition: daemon.h:178
time_diff
double time_diff(const struct timespec *tp1, const struct timespec *tp2)
Returns the time difference between two the specific points in time tp1 and tp2.
Definition: fg_time.c:95
flow
Definition: daemon.h:73
request_error
void request_error(struct request *request, const char *fmt,...)
Definition: daemon.c:132
level_ipproto_udp
@ level_ipproto_udp
Definition: common.h:126
add_flow_source
static xmlrpc_value * add_flow_source(xmlrpc_env *const env, xmlrpc_value *const param_array, void *const user_data)
Prepare data connection for source endpoint.
Definition: fg_rpc_server.c:62
report::rtt_sum
double rtt_sum
Accumulated round-trip time.
Definition: common.h:325
accept_data
int accept_data(struct flow *flow)
Definition: destination.c:250
crit
#define crit(...)
To report an critical error w/ the corresponding system error message.
Definition: fg_error.h:36
get_reports
struct report * get_reports(int *has_more)
Definition: daemon.c:853
flow_settings::dscp
int dscp
DSCP value for TOS byte (option -D).
Definition: common.h:244
report::delay_max
double delay_max
Maximum one-way delay.
Definition: common.h:317
fg_list_remove
int fg_list_remove(struct linked_list *const list, const void *const data)
Removes from the list the first element whose data points to data.
Definition: fg_list.c:65
time_is_after
bool time_is_after(const struct timespec *tp1, const struct timespec *tp2)
Returns true if second point in time tp2 is chronologically after the first point in time tp1.
Definition: fg_time.c:110
set_flow_tcp_options
int set_flow_tcp_options(struct flow *flow)
Definition: daemon.c:1424
report::request_blocks_written
unsigned request_blocks_written
Definition: common.h:302
DEFAULT_SELECT_TIMEOUT
#define DEFAULT_SELECT_TIMEOUT
Time select() will block waiting for a file descriptor to become ready.
Definition: daemon.h:51
next_response_block_size
int next_response_block_size(struct flow *flow)
Definition: trafgen.c:143
logging
void logging(int priority, const char *fmt,...)
Definition: fg_log.c:69
flow::statistics::rtt_sum
double rtt_sum
Accumulated round-trip time.
Definition: daemon.h:154
set_non_blocking
int set_non_blocking(int fd)
Definition: fg_socket.c:172
report::bytes_read
unsigned long long bytes_read
Definition: common.h:295
report::iat_sum
double iat_sum
Accumulated inter-arrival time.
Definition: common.h:313
flow_settings::extra_socket_options
struct flow_settings::extra_socket_options extra_socket_options[MAX_EXTRA_SOCKET_OPTIONS]
flow::statistics::tcp_info
struct fg_tcp_info tcp_info
Definition: daemon.h:157
READ
@ READ
Read operation.
Definition: common.h:108
fg_error.h
Error-reporting routines used by Flowgrind.
UNUSED_ARGUMENT
#define UNUSED_ARGUMENT(x)
Suppress warning for unused argument.
Definition: fg_definitions.h:38
DESTINATION
@ DESTINATION
Endpoint that accepts the connection.
Definition: common.h:100
flow::statistics::delay_max
double delay_max
Maximum one-way delay.
Definition: daemon.h:146
flow_settings::duration
double duration[2]
Duration of flow in seconds (option -T).
Definition: common.h:190
flow::source_settings
struct flow_source_settings source_settings
Definition: daemon.h:84
warnx
#define warnx(...)
To report a warning w/ a system error message.
Definition: fg_error.h:54
flow::pmtu
int pmtu
Definition: daemon.h:114
flow_settings::cork
int cork
Sets SO_DEBUG on test socket (option -O).
Definition: common.h:232
destination.h
Routines used to setup a Flowgrind destination for a test.
report::delay_min
double delay_min
Minimum one-way delay.
Definition: common.h:315
start_flows
static void start_flows(struct request_start_flows *request)
Definition: daemon.c:333
report_flow
static void report_flow(struct flow *flow, int type)
To prepare a report, report type is either INTERVAL or FINAL.
Definition: daemon.c:522
list_node::data
void * data
Pointer to user defined data stored with this node.
Definition: fg_list.h:38
flow::next_write_block_timestamp
struct timespec next_write_block_timestamp
Definition: daemon.h:95
init_flow
void init_flow(struct flow *flow, int is_source)
To initialize all flows to the default value.
Definition: daemon.c:896
flow_settings::cc_alg
char cc_alg[TCP_CA_NAME_MAX]
Set congestion control algorithm ALG on test socket (option -O).
Definition: common.h:236
flow::statistics::delay_min
double delay_min
Minimum one-way delay.
Definition: daemon.h:144
flow::settings
struct flow_settings settings
Definition: daemon.h:83
flow::last_report_time
struct timespec last_report_time
Definition: daemon.h:92
flow::read_block
char * read_block
Definition: daemon.h:97
report::next
struct report * next
Definition: common.h:338
efds
fd_set efds
Definition: daemon.c:96
request_stop_flow
Definition: daemon.h:227
set_dscp
int set_dscp(int fd, int dscp)
Definition: fg_socket.c:132
request_add_flow_destination
Definition: daemon.h:193
set_tcp_cork
int set_tcp_cork(int fd)
Definition: fg_socket.c:317
request_start_flows
Definition: daemon.h:220
CPY_INFO_MEMBER
#define CPY_INFO_MEMBER(a)
ASSIGN_MIN
#define ASSIGN_MIN(s, c)
Assign value if it less than current one.
Definition: fg_definitions.h:62
get_uuid_string
void get_uuid_string(char *uuid_str)
To generate daemon UUID.
Definition: daemon.c:1536
flow::statistics::bytes_written
unsigned long long bytes_written
Definition: daemon.h:125
uninit_flow
void uninit_flow(struct flow *flow)
Definition: daemon.c:165
source.h
Routines used by Flowgrind to setup the source for a test flow.
time_diff_now
double time_diff_now(const struct timespec *tp)
Returns time difference between now and the specific point in time tp.
Definition: fg_time.c:101
flow::statistics::rtt_max
double rtt_max
Maximum round-trip time.
Definition: daemon.h:152
SOL_TCP
#define SOL_TCP
Definition: daemon.c:78
request::type
char type
Definition: daemon.h:181
report
Definition: common.h:286
flow_settings::traffic_dump
int traffic_dump
Dump traffic using libpcap (option -M).
Definition: common.h:204
reports_last
struct report * reports_last
Definition: daemon.c:100
flow::current_block_bytes_read
unsigned current_block_bytes_read
Definition: daemon.h:103
dispatch_request
int dispatch_request(struct request *request, int type)
Dispatch a request to daemon loop.
Definition: daemon.c:1489
mutex
pthread_mutex_t mutex
Definition: daemon.c:93
flow_settings::ipmtudiscover
int ipmtudiscover
Set IP_MTU_DISCOVER on test socket (option -O).
Definition: common.h:246
set_tcp_nodelay
int set_tcp_nodelay(int fd)
Definition: fg_socket.c:358
set_so_debug
int set_so_debug(int fd)
Definition: fg_socket.c:366
process_iat
static void process_iat(struct flow *flow)
Definition: daemon.c:1234
flow_settings::maximum_block_size
int maximum_block_size
Application buffer size in bytes (option -U).
Definition: common.h:201
flow_source_settings::late_connect
int late_connect
Definition: daemon.h:68
set_so_elcn
int set_so_elcn(int fd, int val)
Definition: fg_socket.c:278
flow_settings::flow_control
int flow_control
Stop flow if it is experiencing local congestion (option -C).
Definition: common.h:226
flow::statistics::response_blocks_written
unsigned response_blocks_written
Definition: daemon.h:133
toggle_tcp_cork
int toggle_tcp_cork(int fd)
Definition: fg_socket.c:331
request_add_flow_source
Definition: daemon.h:206
pending_reports
unsigned pending_reports
Definition: daemon.c:101
request
Definition: daemon.h:179
report::iat_max
double iat_max
Maximum inter-arrival time.
Definition: common.h:311
flow::error
char * error
Definition: daemon.h:170
ctimespec_r
const char * ctimespec_r(const struct timespec *tp, char *buf, size_t size, bool ns)
Converts timespec struct tp into a null-terminated string.
Definition: fg_time.c:66
request::next
struct request * next
Definition: daemon.h:189
flow::current_write_block_size
unsigned current_write_block_size
Definition: daemon.h:100
GRIND_WAIT_CONNECT
@ GRIND_WAIT_CONNECT
Definition: daemon.h:56
requests_last
struct request * requests_last
Definition: daemon.c:94
GRIND_WAIT_ACCEPT
@ GRIND_WAIT_ACCEPT
Definition: daemon.h:58
prepare_fds
static int prepare_fds()
Definition: daemon.c:267
requests
struct request * requests
Definition: daemon.c:94
get_pmtu
int get_pmtu(int fd)
Definition: fg_socket.c:193
flow_settings::num_extra_socket_options
int num_extra_socket_options
Definition: common.h:262
process_delay
static void process_delay(struct flow *flow)
Definition: daemon.c:1267
flow_settings::pushy
int pushy
Do not iterate through select() to continue sending in case block size did not suffice to fill sendin...
Definition: common.h:213
flow_sending
static int flow_sending(struct timespec *now, struct flow *flow, int direction)
Definition: daemon.c:152
flow::endpoint
enum endpoint_t endpoint
Definition: daemon.h:78
init_math_functions
void init_math_functions(struct flow *flow, unsigned long seed)
Definition: fg_math.c:58
stop_flow
static void stop_flow(struct request_stop_flow *request)
Definition: daemon.c:382
flow_settings::nonagle
int nonagle
Disable nagle algorithm on test socket (option -O).
Definition: common.h:234
request::error
char * error
Definition: daemon.h:187
set_so_lcd
int set_so_lcd(int fd)
Definition: fg_socket.c:288
fg_socket.h
Routines used to manipulate socket parameters for Flowgrind.
set_ip_mtu_discover
int set_ip_mtu_discover(int fd)
Definition: fg_socket.c:300
MIN_BLOCK_SIZE
#define MIN_BLOCK_SIZE
Minium block (message) size we can send.
Definition: common.h:79
flow_in_delay
static int flow_in_delay(struct timespec *now, struct flow *flow, int direction)
Definition: daemon.c:145
flow::listenfd_data
int listenfd_data
Definition: daemon.h:81
prepare_rfds
static int prepare_rfds(struct timespec *now, struct flow *flow, fd_set *rfds)
Definition: daemon.c:233
report::bytes_written
unsigned long long bytes_written
Definition: common.h:296
process_requests
static void process_requests()
To process the request issued from the controller.
Definition: daemon.c:446
flow_settings::reporting_interval
double reporting_interval
Interval to report flow on screen (option -i).
Definition: common.h:193
flow::statistics::has_tcp_info
int has_tcp_info
Definition: daemon.h:156
write_data
static int write_data(struct flow *flow)
Definition: daemon.c:937
set_route_record
int set_route_record(int fd)
Definition: fg_socket.c:149
try_read_n_bytes
static int try_read_n_bytes(struct flow *flow, int bytes)
Definition: daemon.c:1054
SOURCE
@ SOURCE
Endpoint that opens the connection.
Definition: common.h:98
dump_dir
char * dump_dir
Definition: daemon.c:91
flow_settings::elcn
int elcn
Set TCP_ELCN (20) on test socket (option -O).
Definition: common.h:238
maxfd
int maxfd
Definition: daemon.c:97
fg_definitions.h
Common definitions used by the Flowgrind daemon, controller, and libs.
flow::statistics::request_blocks_read
unsigned request_blocks_read
Definition: daemon.h:130
fg_time.h
Timing related routines used by Flowgrind.
report::pmtu
unsigned pmtu
Discovered Path MTU.
Definition: common.h:332
flow::fd
int fd
Definition: daemon.h:80
wfds
fd_set wfds
Definition: daemon.c:96
block
Flowgrind's data block layout.
Definition: common.h:148
flow::finished
char finished[2]
Definition: daemon.h:112
flow_error
void flow_error(struct flow *flow, const char *fmt,...)
Definition: daemon.c:119
flow::last_block_read
struct timespec last_block_read
Definition: daemon.h:88
send_response
static void send_response(struct flow *flow, int requested_response_block_size)
Definition: daemon.c:1298
flow_settings::delay
double delay[2]
Delay of flow in seconds (option -Y).
Definition: common.h:188
flow::congestion_counter
unsigned congestion_counter
Definition: daemon.h:116
REQUEST_GET_STATUS
#define REQUEST_GET_STATUS
Definition: daemon.h:177
reports
struct report * reports
Definition: daemon.c:99
process_select
static void process_select(fd_set *rfds, fd_set *wfds, fd_set *efds)
Definition: daemon.c:729
report::status
int status
Definition: common.h:336
warn
#define warn(...)
To report a warning w/ the corresponding system error message.
Definition: fg_error.h:50
flow_settings::lcd
int lcd
Set TCP_LCD (21) on test socket (option -O).
Definition: common.h:240
free_all
#define free_all(...)
To free() an arbitrary number of variables.
Definition: fg_definitions.h:59
report::delay_sum
double delay_sum
Accumulated one-way delay.
Definition: common.h:319
level_ipproto_ip
@ level_ipproto_ip
Definition: common.h:123
flow::addr
struct sockaddr * addr
Definition: daemon.h:119
flow_block_scheduled
static int flow_block_scheduled(struct timespec *now, struct flow *flow)
Definition: daemon.c:160
flow::write_block
char * write_block
Definition: daemon.h:98
request::condition
pthread_cond_t * condition
Definition: daemon.h:185
request_get_uuid
structure for getting the UUID.
Definition: daemon.h:240
daemon_pipe
int daemon_pipe[2]
Definition: daemon.c:87
common.h
Data structures used by the Flowgrind daemon and controller.
report::response_blocks_written
unsigned response_blocks_written
Definition: common.h:304
flow::last_block_written
struct timespec last_block_written
Definition: daemon.h:89
free_math_functions
void free_math_functions(struct flow *flow)
Definition: fg_math.c:97
block::data
struct timespec data
Sending timestap for calculating delay and RTT.
Definition: common.h:160
flow::state
enum flow_state_t state
Definition: daemon.h:77
request_get_uuid::r
struct request r
Daemon thread process the request r.
Definition: daemon.h:242
level_ipproto_sctp
@ level_ipproto_sctp
Definition: common.h:124
get_tcp_info
int get_tcp_info(struct flow *flow, struct fg_tcp_info *info)
Definition: daemon.c:650
flow::stop_timestamp
struct timespec stop_timestamp[2]
Definition: daemon.h:87
flow::statistics::bytes_read
unsigned long long bytes_read
Definition: daemon.h:124
flow::start_timestamp
struct timespec start_timestamp[2]
Definition: daemon.h:86
level_sol_tcp
@ level_sol_tcp
Definition: common.h:122
flow::next_report_time
struct timespec next_report_time
Definition: daemon.h:93
next_interpacket_gap
double next_interpacket_gap(struct flow *flow)
Definition: trafgen.c:173
report::type
enum report_t type
Report type - either INTERVAL or FINAL report.
Definition: common.h:291
daemon_thread
pthread_t daemon_thread
Definition: daemon.c:89
dump_prefix
char * dump_prefix
Definition: daemon.c:90
flow::statistics::request_blocks_written
unsigned request_blocks_written
Definition: daemon.h:131
warnc
#define warnc(code,...)
To report a warning w/ the system error message 'code'.
Definition: fg_error.h:52
flow_settings::shutdown
int shutdown
Shutdown socket after test flow (option -N).
Definition: common.h:215
get_imtu
int get_imtu(int fd)
Definition: fg_socket.c:214
list_node
Single element in a doubly linked list.
Definition: fg_list.h:36
flow::statistics::iat_max
double iat_max
Maximum interarrival time.
Definition: daemon.h:140
next_request_block_size
int next_request_block_size(struct flow *flow)
Definition: trafgen.c:107
remove_flow
void remove_flow(struct flow *const flow)
Definition: daemon.c:191
read_data
static int read_data(struct flow *flow)
Definition: daemon.c:1113
flow::statistics::rtt_min
double rtt_min
Minimum round-trip time.
Definition: daemon.h:150
block::data2
struct timespec data2
Used to access 64bit timespec on 32bit arch.
Definition: common.h:162
add_report
void add_report(struct report *report)
Definition: daemon.c:827
report::endpoint
enum endpoint_t endpoint
Daemon endpoint - either source or destination.
Definition: common.h:289
report::imtu
unsigned imtu
Interface MTU.
Definition: common.h:334
trafgen.h
Routines used by the Flowgrind daemon for advanced traffic generation.
flow::first_report_time
struct timespec first_report_time
Definition: daemon.h:91
add_flow_destination
void add_flow_destination(struct request_add_flow_destination *request)
To set daemon flow as destination endpoint.
Definition: destination.c:158
fg_tcp_info
Definition: common.h:267
linked_list
A doubly linked list.
Definition: fg_list.h:46
config.h
flow::id
int id
Definition: daemon.h:75
list_node::next
struct list_node * next
Pointer to the previous node in the list.
Definition: fg_list.h:40
do_connect
int do_connect(struct flow *flow)
Establishes a connection of a flow.
Definition: source.c:153
report::end
struct timespec end
Definition: common.h:293
flows
struct linked_list flows
Definition: daemon.c:103
report::tcp_info
struct fg_tcp_info tcp_info
Definition: common.h:329
REQUEST_START_FLOWS
#define REQUEST_START_FLOWS
Definition: daemon.h:175
flow::current_block_bytes_written
unsigned current_block_bytes_written
Definition: daemon.h:104
timer_check
static void timer_check()
Definition: daemon.c:691
report::request_blocks_read
unsigned request_blocks_read
Definition: common.h:301
flow_settings::mtcp
int mtcp
Set TCP_MTCP (15) on test socket (option -O).
Definition: common.h:242
set_congestion_control
int set_congestion_control(int fd, const char *cc_alg)
Definition: fg_socket.c:265
report::rtt_min
double rtt_min
Minimum round-trip time.
Definition: common.h:321
flow::statistics::iat_sum
double iat_sum
Accumulated interarrival time.
Definition: daemon.h:142
process_rtt
static void process_rtt(struct flow *flow)
Definition: daemon.c:1204
fg_math.h
Routines for statistics and advanced traffic generation.
flow::pcap_thread
pthread_t pcap_thread
Definition: daemon.h:161
started
char started
Definition: daemon.c:105
CONGESTION_LIMIT
#define CONGESTION_LIMIT
Definition: daemon.c:85
level_ipproto_tcp
@ level_ipproto_tcp
Definition: common.h:125
REQUEST_ADD_SOURCE
#define REQUEST_ADD_SOURCE
Definition: daemon.h:174
REQUEST_ADD_DESTINATION
#define REQUEST_ADD_DESTINATION
Definition: daemon.h:173
report::begin
struct timespec begin
Definition: common.h:292
flow_settings::so_debug
int so_debug
Sets SO_DEBUG on test socket (option -O).
Definition: common.h:206
request_get_status::r
struct request r
Definition: daemon.h:249
set_tcp_mtcp
int set_tcp_mtcp(int fd)
Definition: fg_socket.c:347
gettime
int gettime(struct timespec *tp)
Returns the current wall-clock time with nanosecond precision.
Definition: fg_time.c:145
debug.h
Debugging routines for Flowgrind controller and daemon.
daemon.h
Routines used by the Flowgrind daemon.
report::iat_min
double iat_min
Minimum inter-arrival time.
Definition: common.h:309
fg_list_front
const struct list_node * fg_list_front(struct linked_list *const list)
Returns the first element of the list.
Definition: fg_list.c:49
report::response_blocks_read
unsigned response_blocks_read
Definition: common.h:303
flow_settings::route_record
int route_record
Sets ROUTE_RECORD on test socket (option -O).
Definition: common.h:208
flow::statistics
struct flow::statistics statistics[2]