diff --git a/callcontrol/controller.py b/callcontrol/controller.py index ab8fc28..f89828c 100644 --- a/callcontrol/controller.py +++ b/callcontrol/controller.py @@ -1,526 +1,537 @@ # Copyright (C) 2005-2008 AG Projects. See LICENSE for details. # """Implementation of a call control server for OpenSIPS.""" import os import grp import re import cPickle import time from application.configuration import ConfigSection, ConfigSetting from application.process import process from application import log from twisted.internet.protocol import Factory from twisted.protocols.basic import LineOnlyReceiver from twisted.internet import reactor, defer from twisted.python import failure from callcontrol.scheduler import RecurrentCall, KeepRunning from callcontrol.raddb import RadiusDatabase, RadiusDatabaseError from callcontrol.sip import Call from callcontrol.rating import RatingEngineConnections from callcontrol import configuration_filename, backup_calls_file class TimeLimit(int): """A positive time limit (in seconds) or None""" def __new__(typ, value): if value.lower() == 'none': return None try: limit = int(value) except: raise ValueError("invalid time limit value: %r" % value) if limit < 0: raise ValueError("invalid time limit value: %r. should be positive." % value) return limit class TimeoutDetection(str): _values = ('dialog', 'radius') def __init__(self, value): value = value.lower() if value not in self._values: raise ValueError("invalid timeout detection value: %r" % value) if value == 'radius': self.use_radius = True else: self.use_radius = False def __new__(cls, value): return str.__new__(cls, value.lower()) class CallControlConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'CallControl' socket = "%s/socket" % process.runtime_directory group = 'opensips' limit = ConfigSetting(type=TimeLimit, value=None) timeout = 24*60*60 ## timeout calls that are stale for more than 24 hours. setupTime = 90 ## timeout calls that take more than 1'30" to setup. checkInterval = 60 ## check for staled calls and calls that did timeout at every minute. timeout_detection = TimeoutDetection('dialog') ## whether or not to use the radius database to find out terminated calls ## Classes class CommandError(Exception): pass class CallControlError(Exception): pass class NoProviderError(Exception): pass class InvalidRequestError(Exception): pass class CallsMonitor(object): """Check for staled calls and calls that did timeout and were closed by external means""" def __init__(self, period, application): self.application = application self.reccall = RecurrentCall(period, self.run) def run(self): if CallControlConfig.timeout_detection.use_radius: ## Find out terminated calls deferred1 = self.application.db.getTerminatedCalls(self.application.calls) deferred1.addCallbacks(callback=self._clean_calls, errback=self._err_handle, callbackArgs=[self._handle_terminated]) deferred2 = self.application.db.getTimedoutCalls(self.application.calls) deferred2.addCallbacks(callback=self._clean_calls, errback=self._err_handle, callbackArgs=[self._handle_timedout]) defer.DeferredList([deferred1, deferred2]).addCallback(self._finish_checks) else: self._finish_checks(None) return KeepRunning def shutdown(self): self.reccall.cancel() def _clean_calls(self, calls, clean_function): for callid, callinfo in calls.items(): call = self.application.calls.get(callid) if call: self.application.clean_call(callid) clean_function(call, callinfo) def _err_handle(self, fail): log.error("Couldn't query database for terminated/timedout calls: %s" % fail.value) def _handle_terminated(self, call, callinfo): call.end(calltime=callinfo['duration'], reason='calls monitor as terminated') def _handle_timedout(self, call, callinfo): call.end(reason='calls monitor as timedout', sendbye=True) def _finish_checks(self, value): ## Also do the rest of the checking now = time.time() staled = [] nosetup = [] for callid, call in self.application.calls.items(): if not call.complete and (now - call.created >= CallControlConfig.setupTime): self.application.clean_call(callid) nosetup.append(call) elif call.inprogress and call.timer is not None: continue ## this call will be expired by its own timer elif now - call.created >= CallControlConfig.timeout: self.application.clean_call(callid) staled.append(call) ## Terminate staled for call in staled: call.end(reason='calls monitor as staled', sendbye=True) ## Terminate calls that didn't setup in setupTime for call in nosetup: call.end(reason="calls monitor as it didn't setup in %d seconds" % CallControlConfig.setupTime) class CallControlProtocol(LineOnlyReceiver): def lineReceived(self, line): if line.strip() == "": if self.line_buf: self._process() self.line_buf = [] else: self.line_buf.append(line.strip()) def _process(self): try: req = Request(self.line_buf[0], self.line_buf[1:]) except InvalidRequestError, e: self._send_error_reply(failure.Failure(e)) else: # log.debug("Got request: %s" % str(req)) #DEBUG def _unknown_handler(req): req.deferred.errback(failure.Failure(CommandError(req))) try: getattr(self, '_CC_%s' % req.cmd, _unknown_handler)(req) except Exception, e: self._send_error_reply(failure.Failure(e)) else: req.deferred.addCallbacks(callback=self._send_reply, errback=self._send_error_reply) def connectionMade(self): self.line_buf = [] def _send_reply(self, msg): # log.debug('Sent reply: %s' % msg) #DEBUG self.sendLine(msg) def _send_error_reply(self, fail): log.error(fail.value) # log.debug("Sent 'Error' reply") #DEBUG self.sendLine('Error') def _CC_init(self, req): try: call = self.factory.application.calls[req.callid] except KeyError: call = Call(req, self.factory.application) if call.callid in self.factory.application.users.get(call.billingParty, ()): log.error("Call id %s of %s to %s exists in users table but not in calls table" % (call.callid, call.user, call.ruri)) req.deferred.callback('Locked') return if call.billingParty is None: req.deferred.callback('Error') return self.factory.application.calls[req.callid] = call # log.debug("Call id %s added to list of controlled calls" % (call.callid)) #DEBUG else: # The call was previously setup which means it could be in the the users table try: user_calls = self.factory.application.users[call.billingParty] user_calls.remove(call.callid) if len(user_calls) == 0: del self.factory.application.users[call.billingParty] self.factory.application.engines.remove_user(call.billingParty) except (ValueError, KeyError): pass deferred = call.setup(req) deferred.addCallbacks(callback=self._CC_finish_init, errback=self._CC_init_failed, callbackArgs=[req], errbackArgs=[req]) def _CC_finish_init(self, value, req): try: call = self.factory.application.calls[req.callid] except KeyError: log.error("Call id %s disappeared before we could finish initializing it" % req.callid) req.deferred.callback('Error') else: if call.locked: ## prepaid account already locked by another call log.info("Call id %s of %s to %s forbidden because the account is locked" % (req.callid, call.user, call.ruri)) self.factory.application.clean_call(req.callid) call.end() req.deferred.callback('Locked') elif call.timelimit == 0: ## prepaid account with no credit log.info("Call id %s of %s to %s forbidden because credit is too low" % (req.callid, call.user, call.ruri)) self.factory.application.clean_call(req.callid) req.deferred.callback('No credit') elif call.timelimit is None: ## no limit for call log.info("Call id %s of %s to %s is postpaid not limited" % (req.callid, call.user, call.ruri)) self.factory.application.clean_call(req.callid) call.end() - req.deferred.callback('No limit') # No limit - else: + req.deferred.callback('No limit') + else: ## call limited by credit value or a global limit self.factory.application.users.setdefault(call.billingParty, []).append(call.callid) - req.deferred.callback('Limited') # Limited + req.deferred.callback('Limited') def _CC_init_failed(self, fail, req): self._send_error_reply(fail) self.factory.application.clean_call(req.callid) def _CC_start(self, req): try: call = self.factory.application.calls[req.callid] except KeyError: req.deferred.callback('Not found') else: call.start(req) req.deferred.callback('Ok') def _CC_stop(self, req): try: call = self.factory.application.calls[req.callid] except KeyError: req.deferred.callback('Not found') else: self.factory.application.clean_call(req.callid) call.end(reason='user') req.deferred.callback('Ok') def _CC_debug(self, req): debuglines = [] if req.show == 'sessions': for callid, call in self.factory.application.calls.items(): if not req.user or call.user.startswith(req.user): debuglines.append('Call id %s of %s to %s: %s' % (callid, call.user, call.ruri, call.status)) elif req.show == 'session': try: call = self.factory.application.calls[req.callid] except KeyError: debuglines.append('Call id %s does not exist' % req.callid) else: for key, value in call.items(): debuglines.append('%s: %s' % (key, value)) req.deferred.callback('\r\n'.join(debuglines)+'\r\n') def _CC_terminate(self, req): try: call = self.factory.application.calls[req.callid] except KeyError: req.deferred.callback('Call id %s does not exist\r\n' % req.callid) else: self.factory.application.clean_call(req.callid) call.end(reason='admin', sendbye=True) req.deferred.callback('Ok\r\n') class CallControlFactory(Factory): protocol = CallControlProtocol def __init__(self, application): self.application = application class CallControlServer(object): def __init__(self, path=None, group=None): self.path = path or CallControlConfig.socket self.group = group or CallControlConfig.group try: os.unlink(self.path) except OSError: pass self.listening = None self.engines = None self.monitor = None if CallControlConfig.timeout_detection.use_radius: self.db = RadiusDatabase() else: self.db = None self.calls = {} self.users = {} self._restore_calls() def clean_call(self, callid): try: call = self.calls[callid] except KeyError: pass else: del self.calls[callid] try: user_calls = self.users[call.billingParty] user_calls.remove(callid) if len(user_calls) == 0: del self.users[call.billingParty] self.engines.remove_user(call.billingParty) except (ValueError, KeyError): pass # log.debug("Call id %s removed from the list of controlled calls" % callid) #DEBUG def run(self): ## Do the startup stuff self.on_startup() ## And start reactor reactor.run() ## And do the shutdown self.on_shutdown() def stop(self): reactor.stop() def on_startup(self): ## First set up listening on the unix socket try: gid = grp.getgrnam(self.group)[2] mode = 0660 except KeyError, IndexError: gid = -1 mode = 0666 self.listening = reactor.listenUNIX(address=self.path, factory=CallControlFactory(self)) ## Make it writable only to the SIP proxy group members try: os.chown(self.path, -1, gid) os.chmod(self.path, mode) except OSError: log.warn("Couldn't set access rights for %s" % self.path) log.warn("OpenSIPS may not be able to communicate with us!") ## Then setup the CallsMonitor self.monitor = CallsMonitor(CallControlConfig.checkInterval, self) ## Open the connection to the rating engines self.engines = RatingEngineConnections() def on_shutdown(self): if self.listening is not None: self.listening.stopListening() if self.engines is not None: self.engines.shutdown() if self.monitor is not None: self.monitor.shutdown() if self.db is not None: self.db.close() self._save_calls() def _save_calls(self): if self.calls: log.info('Saving calls') calls_file = '%s/%s' % (process.runtime_directory, backup_calls_file) try: f = open(calls_file, 'w') except: pass else: for call in self.calls.values(): call.application = None ## we will mark timers with 'running' or 'idle', depending on their current state, ## to be able to correctly restore them later (Timer objects cannot be pickled) if call.timer is not None: if call.inprogress: call.timer.cancel() call.timer = 'running' ## temporary mark that this timer was running else: call.timer = 'idle' ## temporary mark that this timer was not running failed_dump = False try: try: cPickle.dump(self.calls, f) except Exception, why: log.warn("Failed to dump call list: %s" % why) failed_dump = True finally: f.close() if failed_dump: try: os.unlink(calls_file) except: pass else: log.info("Saved calls: %s" % str(self.calls.keys())) self.calls = {} def _restore_calls(self): calls_file = '%s/%s' % (process.runtime_directory, backup_calls_file) try: f = open(calls_file, 'r') except: pass else: try: self.calls = cPickle.load(f) except Exception, why: log.warn("Failed to load calls saved in the previous session: %s" % why) f.close() try: os.unlink(calls_file) except: pass if self.calls: log.info("Restoring calls saved previously: %s" % str(self.calls.keys())) ## the calls in the 2 sets below are never overlapping because closed and terminated ## calls have different database fingerprints. so the dictionary update below is safe try: db = self.db if self.db is not None else RadiusDatabase() try: terminated = db.query(RadiusDatabase.RadiusTask(None, 'terminated', calls=self.calls)) ## calls terminated by caller/called didtimeout = db.query(RadiusDatabase.RadiusTask(None, 'timedout', calls=self.calls)) ## calls closed by mediaproxy after a media timeout finally: if self.db is None: db.close() except RadiusDatabaseError, e: log.error("Could not query database: %s" % e) else: for callid, call in self.calls.items(): callinfo = terminated.get(callid) or didtimeout.get(callid) if callinfo: ## call already terminated or did timeout in mediaproxy del self.calls[callid] callinfo['call'] = call call.timer = None continue ## close all calls that were already terminated or did timeout count = 0 for callinfo in terminated.values(): call = callinfo.get('call') if call is not None: call.end(calltime=callinfo['duration']) count += 1 for callinfo in didtimeout.values(): call = callinfo.get('call') if call is not None: call.end(sendbye=True) count += 1 if count > 0: log.info("Removed %d already terminated call%s" % (count, 's'*(count!=1))) for callid, call in self.calls.items(): call.application = self if call.timer == 'running': now = time.time() remain = call.starttime + call.timelimit - now if remain < 0: call.timelimit = int(round(now - call.starttime)) remain = 0 call._setup_timer(remain) call.timer.start() elif call.timer == 'idle': call._setup_timer() # also restore users table self.users.setdefault(call.billingParty, []).append(callid) class Request(object): """A request parsed into a structure based on request type""" __methods = {'init': ('callid', 'diverter', 'ruri', 'sourceip', 'from'), 'start': ('callid', 'dialogid'), 'stop': ('callid',), 'debug': ('show',), 'terminate': ('callid',)} def __init__(self, cmd, params): if cmd not in self.__methods.keys(): raise InvalidRequestError("Unknown request: %s" % cmd) try: parameters = dict([re.split(r':\s+', l, 1) for l in params]) except ValueError: raise InvalidRequestError("Badly formatted request") for p in self.__methods[cmd]: try: parameters[p] except KeyError: raise InvalidRequestError("Missing %s from request" % p) self.cmd = cmd self.deferred = defer.Deferred() self.__dict__.update(parameters) try: getattr(self, '_RE_%s' % self.cmd)() except AttributeError: pass def _RE_init(self): self.from_ = self.__dict__['from'] if self.cmd=='init' and self.diverter.lower()=='none': self.diverter = None + try: + self.prepaid + except AttributeError: + self.prepaid = None + else: + if self.prepaid.lower() == 'true': + self.prepaid = True + elif self.prepaid.lower() == 'false': + self.prepaid = False + else: + self.prepaid = None def _RE_debug(self): if self.show == 'session': try: if not self.callid: raise InvalidRequestError("Missing callid from request") except AttributeError: raise InvalidRequestError("Missing callid from request") elif self.show == 'sessions': try: self.user except AttributeError: self.user = None else: raise InvalidRequestError("Illegal value for 'show' attribute in request") def __str__(self): if self.cmd == 'init': - return "%(cmd)s: callid=%(callid)s from=%(from_)s ruri=%(ruri)s diverter=%(diverter)s sourceip=%(sourceip)s" % self.__dict__ + return "%(cmd)s: callid=%(callid)s from=%(from_)s ruri=%(ruri)s diverter=%(diverter)s sourceip=%(sourceip)s prepaid=%(prepaid)s" % self.__dict__ elif self.cmd == 'start': return "%(cmd)s: callid=%(callid)s dialogid=%(dialogid)s" % self.__dict__ elif self.cmd == 'stop': return "%(cmd)s: callid=%(callid)s" % self.__dict__ elif self.cmd == 'debug': return "%(cmd)s: show=%(show)s" % self.__dict__ elif self.cmd == 'terminate': return "%(cmd)s: callid=%(callid)s" % self.__dict__ else: return object.__str__(self) diff --git a/callcontrol/rating.py b/callcontrol/rating.py index 5acfc50..138b7b9 100644 --- a/callcontrol/rating.py +++ b/callcontrol/rating.py @@ -1,314 +1,325 @@ # Copyright (C) 2005-2008 AG Projects. See LICENSE for details. # """Rating engine interface implementation.""" import random import socket from collections import deque from application.configuration import ConfigSection, ConfigSetting from application.configuration.datatypes import EndpointAddress from application.system import default_host_ip from application import log from application.python.util import Singleton from twisted.internet.protocol import ReconnectingClientFactory from twisted.internet.error import TimeoutError from twisted.internet import reactor, defer from twisted.protocols.basic import LineOnlyReceiver from twisted.python import failure from callcontrol import configuration_filename ## ## Rating engine configuration ## class RatingEngineAddress(EndpointAddress): default_port = 9024 name = 'rating engine address' class RatingEngineAddresses(list): def __new__(cls, engines): engines = engines.split() engines = [RatingEngineAddress(engine) for engine in engines] return engines class TimeLimit(int): """A positive time limit (in seconds) or None""" def __new__(typ, value): if value.lower() == 'none': return None try: limit = int(value) except: raise ValueError("invalid time limit value: %r" % value) if limit < 0: raise ValueError("invalid time limit value: %r. should be positive." % value) return limit class RatingConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'CDRTool' address = ConfigSetting(type=RatingEngineAddresses, value=[]) timeout = 500 class CallControlConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'CallControl' prepaid_limit = ConfigSetting(type=TimeLimit, value=None) limit = ConfigSetting(type=TimeLimit, value=None) if not RatingConfig.address: try: RatingConfig.address = RatingEngineAddresses('cdrtool.' + socket.gethostbyaddr(default_host_ip)[0].split('.', 1)[1]) except Exception, e: log.fatal('Cannot resolve hostname %s' % ('cdrtool.' + socket.gethostbyaddr(default_host_ip)[0].split('.', 1)[1])) class RatingError(Exception): pass class RatingEngineError(RatingError): pass class RatingEngineTimeoutError(TimeoutError): pass class RatingRequest(str): def __init__(self, command, reliable=True, **kwargs): self.command = command self.reliable = reliable self.kwargs = kwargs self.deferred = defer.Deferred() def __new__(cls, command, reliable=True, **kwargs): reqstr = command + (kwargs and (' ' + ' '.join("%s=%s" % (name,value) for name, value in kwargs.items())) or '') obj = str.__new__(cls, reqstr) return obj class RatingEngineProtocol(LineOnlyReceiver): delimiter = '\n\n' def __init__(self): self.connected = False self.__request = None self.__timeout_call = None self._request_queue = deque() def connectionMade(self): log.info("Connected to Rating Engine at %s:%d" % (self.transport.getPeer().host, self.transport.getPeer().port)) self.connected = True self.factory.application.connectionMade(self.transport.connector) if self._request_queue: self._send_next_request() def connectionLost(self, reason=None): log.info("Disconnected from Rating Engine at %s:%d" % (self.transport.getPeer().host, self.transport.getPeer().port)) self.connected = False if self.__request is not None: if self.__request.reliable: self._request_queue.appendleft(self.__request) self.__request = None else: self._respond("Connection with the Rating Engine is down: %s" % reason, success=False) self.factory.application.connectionLost(self.transport.connector, reason, self) def timeoutConnection(self): log.info("Connection to Rating Engine at %s:%d timedout" % (self.transport.getPeer().host, self.transport.getPeer().port)) self.transport.loseConnection(RatingEngineTimeoutError()) def lineReceived(self, line): # log.debug("Got reply from rating engine: %s" % line) #DEBUG if not line: return if self.__timeout_call is not None: self.__timeout_call.cancel() if self.__request is None: log.warn("Got reply for unexisting request: %s" % line) return try: self._respond(getattr(self, '_PE_%s' % self.__request.command.lower())(line)) except AttributeError: self._respond("Unknown command in request. Cannot handle reply. Reply is: %s" % line, success=False) except Exception, e: self._respond(str(e), success=False) def _PE_maxsessiontime(self, line): + lines = line.splitlines() + + limit = lines[0].strip().capitalize() try: - limit = line.splitlines()[0].strip().capitalize() - try: - limit = int(limit) - except: - if limit == 'None': - limit = None - elif limit == 'Locked': - pass - else: - raise ValueError("limit must be a non-negative number, None or Locked: %s" % str(limit)) + limit = int(limit) + except: + if limit == 'None': + limit = None + elif limit == 'Locked': + pass else: - if limit < 0: - raise ValueError("limit must be a non-negative number, None or Locked: %s" % str(limit)) - except Exception, e: - raise e - return limit + raise ValueError("limit must be a non-negative number, None or Locked: %s" % limit) + else: + if limit < 0: + raise ValueError("limit must be a non-negative number, None or Locked: %s" % limit) + + info = dict(line.split('=', 1) for line in lines[1:]) + if 'type' in info: + type = info['type'].lower() + if type == 'prepaid': + prepaid = True + elif type == 'postpaid': + prepaid = False + else: + raise ValueError("prepaid must be either True or False: %s" % prepaid) + else: + prepaid = limit is not None + return limit, prepaid def _PE_debitbalance(self, line): valid_answers = ('Ok', 'Failed', 'Not prepaid') lines = line.splitlines() result = lines[0].strip().capitalize() if result not in valid_answers: log.error("Invalid reply from rating engine: `%s'" % lines[0].strip()) log.warn("Rating engine possible failed query: %s" % self.__request) raise RatingEngineError('Invalid rating engine response') elif result == 'Failed': log.warn("Rating engine failed query: %s" % self.__request) raise RatingEngineError('Rating engine failed query') else: try: timelimit = int(lines[1].split('=', 1)[1].strip()) totalcost = lines[2].strip() except: log.error("Invalid reply from rating engine for DebitBalance on lines 2, 3: `%s'" % ("', `".join(lines[1:3]))) timelimit = None totalcost = 0 return timelimit, totalcost def _send_next_request(self): if self.connected: self.__request = self._request_queue.popleft() self.sendLine(self.__request) self._set_timeout() # log.debug("Sent request to rating engine: %s" % self.__request) #DEBUG else: self.__request = None def _respond(self, result, success=True): if self.__request is not None: req = self.__request self.__request = None try: if success: req.deferred.callback(result) else: req.deferred.errback(failure.Failure(RatingEngineError(result))) except defer.AlreadyCalledError: log.debug("Request %s was already responded to" % str(req)) if self._request_queue: self._send_next_request() def _set_timeout(self, timeout=None): if timeout is None: timeout = self.factory.timeout self.__timeout_call = reactor.callLater(timeout/1000.0, self.timeoutConnection) def send_request(self, request): if not request.reliable and not self.connected: request.deferred.errback(failure.Failure(RatingEngineError("Connection with the Rating Engine is down"))) return request self._request_queue.append(request) if self.__request is None: self._send_next_request() return request class RatingEngineFactory(ReconnectingClientFactory): protocol = RatingEngineProtocol timeout = RatingConfig.timeout # reconnect parameters maxDelay = 15 factor = maxDelay initialDelay = 1.0/factor delay = initialDelay def __init__(self, application): self.application = application def buildProtocol(self, addr): self.resetDelay() return ReconnectingClientFactory.buildProtocol(self, addr) def clientConnectionFailed(self, connector, reason): if self.application.disconnecting: return ReconnectingClientFactory.clientConnectionFailed(self, connector, reason) def clientConnectionLost(self, connector, reason): if self.application.disconnecting: return ReconnectingClientFactory.clientConnectionLost(self, connector, reason) class RatingEngine(object): def __init__(self, address): self.address = address self.disconnecting = False self.connector = reactor.connectTCP(self.address[0], self.address[1], factory=RatingEngineFactory(self)) self.connection = None self.__unsent_req = deque() def shutdown(self): self.disconnecting = True self.connector.disconnect() def connectionMade(self, connector): self.connection = connector.transport self.connection.protocol._request_queue.extend(self.__unsent_req) for req in self.__unsent_req: log.debug("Requeueing request for the rating engine: %s" % (req,)) self.__unsent_req.clear() def connectionLost(self, connector, reason, protocol): while protocol._request_queue: req = protocol._request_queue.pop() if not req.reliable: log.debug("Request is considered failed: %s" % (req,)) req.deferred.errback(failure.Failure(RatingEngineError("Connection with the Rating Engine is down"))) else: log.debug("Saving request to be requeued later: %s" % (req,)) self.__unsent_req.appendleft(req) self.connection = None def getCallLimit(self, call, max_duration=CallControlConfig.prepaid_limit, reliable=True): max_duration = max_duration or CallControlConfig.limit or 36000 args = {} if call.inprogress: args['State'] = 'Connected' req = RatingRequest('MaxSessionTime', reliable=reliable, CallId=call.callid, From=call.billingParty, To=call.ruri, Gateway=call.sourceip, Duration=max_duration, **args) if self.connection is not None: return self.connection.protocol.send_request(req).deferred else: self.__unsent_req.append(req) return req.deferred def debitBalance(self, call, reliable=True): req = RatingRequest('DebitBalance', reliable=reliable, CallId=call.callid, From=call.billingParty, To=call.ruri, Gateway=call.sourceip, Duration=call.duration) if self.connection is not None: return self.connection.protocol.send_request(req).deferred else: self.__unsent_req.append(req) return req.deferred class RatingEngineConnections(object): __metaclass__ = Singleton def __init__(self): self.connections = [RatingEngine(engine) for engine in RatingConfig.address] self.user_connections = {} @staticmethod def getConnection(call=None): engines = RatingEngineConnections() conn = random.choice(engines.connections) if call is None: return conn return engines.user_connections.setdefault(call.billingParty, conn) def remove_user(self, user): try: del self.user_connections[user] except KeyError: pass def shutdown(self): for engine in self.connections: engine.shutdown() diff --git a/callcontrol/sip.py b/callcontrol/sip.py index f92bb70..8c7a6b8 100644 --- a/callcontrol/sip.py +++ b/callcontrol/sip.py @@ -1,315 +1,322 @@ # Copyright (C) 2005-2008 AG Projects. See LICENSE for details. # """ Implementation of Call objects used to store call information and manage a call. """ import time import re from application import log from twisted.internet.error import AlreadyCalled from twisted.internet import reactor, defer from callcontrol.rating import RatingEngineConnections from callcontrol.opensips import DialogID, ManagementInterface +class CallError(Exception): pass class SipError(Exception): pass ## ## Call data types ## class InvalidRequestError(Exception): pass class ReactorTimer(object): def __init__(self, delay, function, args=[], kwargs={}): self.calldelay = delay self.function = function self.args = args self.kwargs = kwargs self.dcall = None def start(self): if self.dcall is None: self.dcall = reactor.callLater(self.calldelay, self.function, *self.args, **self.kwargs) def cancel(self): if self.dcall is not None: try: self.dcall.cancel() except AlreadyCalled: self.dcall = None def delay(self, seconds): if self.dcall is not None: try: self.dcall.delay(seconds) except AlreadyCalled: self.dcall = None def reset(self, seconds): if self.dcall is not None: try: self.dcall.reset(seconds) except AlreadyCalled: self.dcall = None class Structure(dict): def __init__(self): dict.__init__(self) def __getitem__(self, key): elements = key.split('.') obj = self ## start with ourselves for e in elements: if not isinstance(obj, dict): raise TypeError("unsubscriptable object") obj = dict.__getitem__(obj, e) return obj def __setitem__(self, key, value): self.__dict__[key] = value dict.__setitem__(self, key, value) def __delitem__(self, key): dict.__delitem__(self, key) del self.__dict__[key] __setattr__ = __setitem__ def __delattr__(self, name): try: del self.__dict__[name] except KeyError: raise AttributeError("'%s' object has no attribute '%s'" % (self.__class__.__name__, name)) else: dict.__delitem__(self, name) def update(self, other): dict.update(self, other) for key, value in other.items(): self.__dict__[key] = value class Call(Structure): """Defines a call""" def __init__(self, request, application): Structure.__init__(self) - self.prepaid = False + self.prepaid = request.prepaid self.locked = False ## if the account is locked because another call is in progress self.expired = False ## if call did consume its timelimit before being terminated self.created = time.time() self.timer = None self.starttime = None self.endtime = None self.timelimit = None self.duration = 0 self.callid = request.callid self.dialogid = None self.diverter = request.diverter self.ruri = request.ruri self.sourceip = request.sourceip self['from'] = request.from_ ## from is a python keyword ## Determine who will pay for the call if self.diverter is not None: self.billingParty = 'sip:%s' % self.diverter self.user = self.diverter else: match = re.search(r'(?P
sip:(?P