Codebase list sipp / edb1bbd2-a8bf-4e10-bcd2-4665dfe16fe0/main opentask.cpp
edb1bbd2-a8bf-4e10-bcd2-4665dfe16fe0/main

Tree @edb1bbd2-a8bf-4e10-bcd2-4665dfe16fe0/main (Download .tar.gz)

opentask.cpp @edb1bbd2-a8bf-4e10-bcd2-4665dfe16fe0/mainraw · history · blame

/*
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 2 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 *
 *  Author : Richard GAYRAUD - 04 Nov 2003
 *           Marc LAMBERTON
 *           Olivier JACQUES
 *           Herve PELLAN
 *           David MANSUTTI
 *           Francois-Xavier Kowalski
 *           Gerard Lyonnaz
 *           From Hewlett Packard Company.
 *           F. Tarek Rogers
 *           Peter Higginson
 *           Vincent Luba
 *           Shriram Natarajan
 *           Guillaume Teissier from FTR&D
 *           Clement Chen
 *           Wolfgang Beck
 *           Charles P Wright from IBM Research
 */
#include "sipp.hpp"

class opentask *opentask::instance = NULL;
unsigned long opentask::calls_since_last_rate_change = 0;
unsigned long opentask::last_rate_change_time = 0;

void opentask::initialize() {
  assert(instance == NULL);
  instance = new opentask();
}

opentask::opentask() {
  setRunning();
}

opentask::~opentask() {
  instance = NULL;
}

void opentask::dump() {
  WARNING("Uniform rate call generation task: %d", rate);
}

unsigned int opentask::wake() {
  if (paused) {
    return 0;
  } else if (users >= 0) {
    /* We need to wait until another call is terminated. */
    return 0;
  } else {
    /* We need to compute when the next call is going to be opened. */
    return (unsigned long)
        MAX(last_rate_change_time + (calls_since_last_rate_change /
                MAX(rate/MAX(rate_period_ms, 1), 1)), 1);
  }
}

bool opentask::run() {
  int calls_to_open = 0;

  if (quitting) {
    delete this;
    return false;
  }

  if (paused) {
    setPaused();
    return true;
  }

  long l=0;
  unsigned long long current_calls = main_scenario->stats->GetStat(CStat::CPT_C_CurrentCall);
  unsigned long long total_calls = main_scenario->stats->GetStat(CStat::CPT_C_IncomingCallCreated) + main_scenario->stats->GetStat(CStat::CPT_C_OutgoingCallCreated);

  if (users >= 0) {
    calls_to_open = ((l = (users - current_calls)) > 0) ? l : 0;
  } else {
    calls_to_open = (unsigned int)
      ((l=(long)floor(((clock_tick - last_rate_change_time) * rate/rate_period_ms)
		      - calls_since_last_rate_change))>0?l:0);
  }

  if (total_calls + calls_to_open > stop_after) {
    calls_to_open = stop_after - total_calls;
  }

  if (open_calls_allowed && (current_calls + calls_to_open > open_calls_allowed)) {
    calls_to_open = open_calls_allowed - current_calls;
  }

  if (calls_to_open < 0) {
    calls_to_open = 0;
  }

  unsigned int start_clock = getmilliseconds();


  while(calls_to_open--)
  {
      /* Associate a user with this call, if we are in users mode. */
      int userid = 0;
      if (users >= 0) {
	userid = freeUsers.back();
	freeUsers.pop_back();
      }

      // adding a new OUTGOING CALL
      main_scenario->stats->computeStat(CStat::E_CREATE_OUTGOING_CALL);
      call * call_ptr = call::add_call(userid, local_ip_is_ipv6, use_remote_sending_addr ? &remote_sending_sockaddr : &remote_sockaddr);
      if(!call_ptr) {
	ERROR("Out of memory allocating call!");
      }

      calls_since_last_rate_change++;

      outbound_congestion = false;

      if (!multisocket) {
	switch(transport) {
	  case T_UDP:
	    call_ptr->associate_socket(main_socket);
	    main_socket->ss_count++;
	    break;
	  case T_TCP:
	  case T_SCTP:
	  case T_TLS:
	    call_ptr->associate_socket(tcp_multiplex);
	    tcp_multiplex->ss_count++;
	    break;
	}
      }
      if (getmilliseconds() > start_clock) {
	break;
      }
    }

    /* We can pause. */
    if (calls_to_open <= 0) {
      setPaused();
    } else {
      /* Stay running. */
    }

    // Quit after asked number of calls is reached
    if(total_calls >= stop_after) {
      quitting = 1;
      return false;
    }

    return true;
}

void opentask::set_paused(bool new_paused)
{
  if (!instance) {
    /* Doesn't do anything, we must be in server mode. */
    return;
  }
  if (new_paused) {
    instance->setPaused();
  } else {
    instance->setRunning();
    if (users >= 0) {
      set_users(users);
    } else {
      set_rate(rate);
    }
  }
  paused = new_paused;
}

void opentask::set_rate(double new_rate)
{
  if (!instance) {
    /* Doesn't do anything, we must be in server mode. */
  }

  rate = new_rate;
  if(rate < 0) {
    rate = 0;
  }

  last_rate_change_time = getmilliseconds();
  calls_since_last_rate_change = 0;

  if(!open_calls_user_setting) {

    int call_duration_min =  main_scenario->duration;

    if(duration > call_duration_min) call_duration_min = duration;

    if(call_duration_min < 1000) call_duration_min = 1000;

    open_calls_allowed = (int)((3.0 * rate * call_duration_min) / (double)rate_period_ms);
    if(!open_calls_allowed) {
      open_calls_allowed = 1;
    }
  }
}

void opentask::set_users(int new_users)
{
  if (!instance) {
    /* Doesn't do anything, we must be in server mode. */
    return;
  }

  if (new_users < 0) {
    new_users = 0;
  }
  assert(users >= 0);

  if (users < new_users ) {
    while (users < new_users) {
      int userid;
      if (!retiredUsers.empty()) {
	userid = retiredUsers.back();
	retiredUsers.pop_back();
      } else {
	userid = users + 1;
	userVarMap[userid] = new VariableTable(userVariables);
      }
      freeUsers.push_front(userid);
      users++;
    }
  }

  users = open_calls_allowed = new_users;

  last_rate_change_time = clock_tick;
  calls_since_last_rate_change = 0;

  assert(open_calls_user_setting);

  instance->setRunning();
}

void opentask::freeUser(int userId) {
  if (main_scenario->stats->GetStat(CStat::CPT_C_CurrentCall) > open_calls_allowed) {
    retiredUsers.push_front(userId);
  } else {
    freeUsers.push_front(userId);
    /* Wake up the call creation thread. */
    if (instance) {
      instance->setRunning();
    }
  }
}