diff --git a/call-control b/call-control index 2782a66..fc4b2c0 100755 --- a/call-control +++ b/call-control @@ -1,90 +1,90 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 """Call control engine for OpenSIPS""" if __name__ == '__main__': import callcontrol import sys from application import log from application.process import process, ProcessError from argparse import ArgumentParser name = 'call-control' fullname = 'SIP call-control engine' description = 'Implementation of a call-control engine for SIP' process.configuration.user_directory = None process.configuration.subdirectory = 'callcontrol' process.runtime.subdirectory = 'callcontrol' parser = ArgumentParser(usage='%(prog)s [options]') parser.add_argument('--version', action='version', version='%(prog)s {}'.format(callcontrol.__version__)) parser.add_argument('--systemd', action='store_true', help='run as a systemd simple service and log to journal') parser.add_argument('--no-fork', action='store_false', dest='fork', help='run in the foreground and log to the terminal') parser.add_argument('--config-dir', dest='config_directory', default=None, help='the configuration directory ({})'.format(process.configuration.system_directory), metavar='PATH') parser.add_argument('--runtime-dir', dest='runtime_directory', default=None, help='the runtime directory ({})'.format(process.runtime.directory), metavar='PATH') parser.add_argument('--debug', action='store_true', help='enable verbose logging') parser.add_argument('--debug-memory', action='store_true', help='enable memory debugging') options = parser.parse_args() log.Formatter.prefix_format = '{record.levelname:<8s} ' if options.config_directory is not None: process.configuration.local_directory = options.config_directory if options.runtime_directory is not None: process.runtime.directory = options.runtime_directory try: process.runtime.create_directory() except ProcessError as e: log.critical('Cannot start %s: %s', fullname, e) sys.exit(1) if options.systemd: from systemd.journal import JournalHandler log.set_handler(JournalHandler(SYSLOG_IDENTIFIER=name)) log.capture_output() elif options.fork: try: process.daemonize(pidfile='{}.pid'.format(name)) - except ProcessError, e: + except ProcessError as e: log.critical('Cannot start %s: %s', fullname, e) sys.exit(1) log.use_syslog(name) log.info('Starting %s %s', fullname, callcontrol.__version__) try: process.wait_for_network(wait_time=10, wait_message='Waiting for network to become available...') except KeyboardInterrupt: sys.exit(0) except RuntimeError as e: log.critical('Cannot start %s: %s' % (fullname, e)) sys.exit(1) from callcontrol.controller import CallControlServer if options.debug: log.level.current = log.level.DEBUG if options.debug_memory: from application.debug.memory import memory_dump try: server = CallControlServer() - except Exception, e: + except Exception as e: log.critical('Could not create %s: %s', fullname, e) if type(e) is not RuntimeError: log.exception() sys.exit(1) try: server.run() - except Exception, e: + except Exception as e: log.critical('Could not run %s: %s', fullname, e) if type(e) is not RuntimeError: log.exception() if options.debug_memory: memory_dump() diff --git a/call-control-cli b/call-control-cli index 3ac773e..be0edad 100755 --- a/call-control-cli +++ b/call-control-cli @@ -1,117 +1,117 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 import callcontrol import socket import sys from application import log from application.process import process from argparse import ArgumentParser class CallControlCommand(object): def __init__(self, command, **kw): self.command = command self.kw = kw def __str__(self): arguments = [self.command] arguments.extend('{}: {}'.format(key, value) for key, value in self.kw.items()) return '\r\n'.join(arguments) + '\r\n\r\n' def execute(self): target = process.runtime.file('socket') sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: # noinspection PyShadowingNames try: sock.connect(target) - sock.sendall(str(self)) - response = '' + sock.sendall(str(self).encode()) + response = b'' while True: data = sock.recv(4096) response += data - if not data or data.endswith('\r\n\r\n'): + if not data or data.endswith(b'\r\n\r\n'): break except socket.error as e: raise RuntimeError('could not send command to {}: {}'.format(target, e)) finally: sock.close() for line in response.rstrip().splitlines(): print(line) @classmethod def handler(cls, options): raise NotImplementedError class ListCommand(CallControlCommand): def __init__(self, user=None): if user is not None: kw = dict(command='debug', show='sessions', user=user) else: kw = dict(command='debug', show='sessions') - super(ListCommand, self).__init__(**kw) + super().__init__(**kw) @classmethod def handler(cls, options): return cls(options.user).execute() class ShowCommand(CallControlCommand): def __init__(self, call_id): - super(ShowCommand, self).__init__(command='debug', show='session', callid=call_id) + super().__init__(command='debug', show='session', callid=call_id) @classmethod def handler(cls, options): return cls(options.call_id).execute() class TerminateCommand(CallControlCommand): def __init__(self, call_id): - super(TerminateCommand, self).__init__(command='terminate', callid=call_id) + super().__init__(command='terminate', callid=call_id) @classmethod def handler(cls, options): return cls(options.call_id).execute() if __name__ == '__main__': name = 'call-control-cli' description = 'Command line interface tool for call-control' log.Formatter.prefix_format = '{record.levelname:<8s} ' process.configuration.user_directory = None process.configuration.subdirectory = 'callcontrol' process.runtime.subdirectory = 'callcontrol' parser = ArgumentParser(description="This script can issue commands to a running instance of a call-control server. Use '%(prog)s COMMAND --help' for help on a specific command.") parser.add_argument('--version', action='version', version='%(prog)s {}'.format(callcontrol.__version__)) parser.add_argument('--runtime-dir', dest='runtime_directory', default=None, help='the runtime directory ({})'.format(process.runtime.directory), metavar='PATH') subparsers = parser.add_subparsers(title='supported commands', dest='command') parser_list = subparsers.add_parser('list', help='list existing sessions') parser_list.add_argument('user', nargs='?', help='optional user to filter results by') parser_list.set_defaults(handler=ListCommand.handler) parser_show = subparsers.add_parser('show', help='show details about a specific session') parser_show.add_argument('call_id', help='the call-id for the session') parser_show.set_defaults(handler=ShowCommand.handler) parser_terminate = subparsers.add_parser('terminate', help='terminate a specific session') parser_terminate.add_argument('call_id', help='the call-id for the session') parser_terminate.set_defaults(handler=TerminateCommand.handler) args = parser.parse_args() if args.runtime_directory is not None: process.runtime.directory = args.runtime_directory try: args.handler(args) except Exception as e: log.critical('Failed to execute command: {}'.format(e)) sys.exit(1) diff --git a/callcontrol/__init__.py b/callcontrol/__init__.py index 4164cf0..f557eb2 100644 --- a/callcontrol/__init__.py +++ b/callcontrol/__init__.py @@ -1,8 +1,8 @@ """OpenSIPS Call control""" -__version__ = "3.0.1" +__version__ = "4.0.0" -configuration_file = 'config.ini' +configuration_file = '/etc/callcontrol/config.ini' backup_calls_file = 'calls.dat' diff --git a/callcontrol/controller.py b/callcontrol/controller.py index b8afa26..452def5 100644 --- a/callcontrol/controller.py +++ b/callcontrol/controller.py @@ -1,507 +1,514 @@ """Implementation of a call control server for OpenSIPS.""" import os import grp import re import pickle import time from application import log from application.configuration import ConfigSection, ConfigSetting from application.process import process from application.system import unlink from twisted.internet.protocol import Factory from twisted.protocols.basic import LineOnlyReceiver from twisted.internet import reactor, defer from twisted.python import failure from callcontrol.scheduler import RecurrentCall, KeepRunning from callcontrol.raddb import RadiusDatabase, RadiusDatabaseError from callcontrol.sip import Call from callcontrol.rating import RatingEngineConnections from callcontrol import configuration_file, backup_calls_file class TimeLimit(int): """A positive time limit (in seconds) or None""" def __new__(typ, value): if value.lower() == 'none': return None try: limit = int(value) except: raise ValueError("invalid time limit value: %r" % value) if limit < 0: raise ValueError("invalid time limit value: %r. should be positive." % value) return limit class CallControlConfig(ConfigSection): __cfgfile__ = configuration_file __section__ = 'CallControl' socket = process.runtime.file('socket') group = 'opensips' limit = ConfigSetting(type=TimeLimit, value=None) timeout = 24*60*60 # timeout calls that are stale for more than 24 hours. setupTime = 90 # timeout calls that take more than 1'30" to setup. checkInterval = 60 # check for staled calls and calls that did timeout at every minute. class CommandError(Exception): pass class InvalidRequestError(Exception): pass class CallsMonitor(object): """Check for staled calls""" def __init__(self, period, application): self.application = application self.reccall = RecurrentCall(period, self.run) def run(self): now = time.time() staled = [] nosetup = [] for callid, call in list(self.application.calls.items()): if not call.complete and (now - call.created >= CallControlConfig.setupTime): self.application.clean_call(callid) nosetup.append(call) elif call.inprogress and call.timer is not None: continue # this call will be expired by its own timer elif now - call.created >= CallControlConfig.timeout: self.application.clean_call(callid) staled.append(call) # Terminate staled for call in staled: call.end(reason='calls monitor as staled', sendbye=True) # Terminate calls that didn't setup in setupTime for call in nosetup: call.end(reason="calls monitor as it didn't setup in %d seconds" % CallControlConfig.setupTime) return KeepRunning def shutdown(self): self.reccall.cancel() class CallControlProtocol(LineOnlyReceiver): def lineReceived(self, line): + line = line.decode('utf-8') + if line.strip() == "": if self.line_buf: self._process() self.line_buf = [] else: self.line_buf.append(line.strip()) def _process(self): try: req = Request(self.line_buf[0], self.line_buf[1:]) except InvalidRequestError as e: + log.info("Invalid OpenSIPS request: %s" % str(e)) self._send_error_reply(failure.Failure(e)) else: - # log.debug('Got request: %s', req) + log.debug('Received request from OpenSIPS %s', req) def _unknown_handler(req): req.deferred.errback(failure.Failure(CommandError(req))) + try: + getattr(self, '_CC_%s' % req.cmd, _unknown_handler)(req) except Exception as e: + self._send_error_reply(failure.Failure(e)) else: req.deferred.addCallbacks(callback=self._send_reply, errback=self._send_error_reply) def connectionMade(self): self.line_buf = [] def _send_reply(self, msg): - # log.debug('Sent reply: %s', msg) - self.sendLine(msg) + log.debug('Send response to OpenSIPS: %s', msg) + self.sendLine(msg.encode('utf-8')) def _send_error_reply(self, fail): log.error(fail.value) - # log.debug("Sent 'Error' reply") - self.sendLine('Error') + log.info("Sent 'Error' response to OpenSIPS") + self.sendLine(b'Error') def _CC_init(self, req): + #import pdb; pdb.set_trace() try: call = self.factory.application.calls[req.callid] except KeyError: call = Call(req, self.factory.application) if call.billingParty is None: req.deferred.callback('Error') return self.factory.application.calls[req.callid] = call # log.debug('Call id %s added to list of controlled calls', call.callid) else: if call.token != req.call_token: log.error("Call id %s is duplicated" % call.callid) req.deferred.callback('Duplicated callid') return # The call was previously setup which means it could be in the the users table try: user_calls = self.factory.application.users[call.billingParty] user_calls.remove(call.callid) if len(user_calls) == 0: del self.factory.application.users[call.billingParty] self.factory.application.engines.remove_user(call.billingParty) except (ValueError, KeyError): pass deferred = call.setup(req) deferred.addCallbacks(callback=self._CC_finish_init, errback=self._CC_init_failed, callbackArgs=[req], errbackArgs=[req]) def _CC_finish_init(self, value, req): try: call = self.factory.application.calls[req.callid] except KeyError: log.error("Call id %s disappeared before we could finish initializing it" % req.callid) req.deferred.callback('Error') else: if req.call_limit is not None and len(self.factory.application.users.get(call.billingParty, ())) >= req.call_limit: log.info("Call id %s of %s to %s forbidden because limit has been reached" % (req.callid, call.user, call.ruri)) self.factory.application.clean_call(req.callid) call.end() req.deferred.callback('Call limit reached') elif call.locked: # prepaid account already locked by another call log.info("Call from %s to %s is forbidden because account is locked" % (call.user, call.ruri, req.callid)) self.factory.application.clean_call(req.callid) call.end() req.deferred.callback('Locked') elif call.timelimit == 0: # prepaid account with no credit log.info("Call from %s to %s is forbidden because of low credit (%s)" % (call.user, call.ruri, req.callid)) self.factory.application.clean_call(req.callid) call.end() req.deferred.callback('No credit') elif req.call_limit is not None or call.timelimit is not None: # call limited by credit value, a global time limit or number of calls log.info("%s can make %s concurrent calls (%s)" % (call.billingParty, req.call_limit or "unlimited", req.callid)) self.factory.application.users.setdefault(call.billingParty, []).append(call.callid) req.deferred.callback('Limited') else: # no limit for call log.info("Call from %s to %s is postpaid without limits (%s)" % (call.user, call.ruri, req.callid)) self.factory.application.clean_call(req.callid) call.end() req.deferred.callback('No limit') def _CC_init_failed(self, fail, req): self._send_error_reply(fail) self.factory.application.clean_call(req.callid) def _CC_start(self, req): try: call = self.factory.application.calls[req.callid] except KeyError: req.deferred.callback('Not found') else: call.start(req) req.deferred.callback('Ok') def _CC_stop(self, req): try: call = self.factory.application.calls[req.callid] except KeyError: req.deferred.callback('Not found') else: self.factory.application.clean_call(req.callid) call.end(reason='user') req.deferred.callback('Ok') def _CC_debug(self, req): debuglines = [] if req.show == 'sessions': for callid, call in list(self.factory.application.calls.items()): if not req.user or call.user.startswith(req.user): debuglines.append('Call id %s of %s to %s: %s' % (callid, call.user, call.ruri, call.status)) elif req.show == 'session': try: call = self.factory.application.calls[req.callid] except KeyError: debuglines.append('Call id %s does not exist' % req.callid) else: for key, value in list(call.items()): debuglines.append('%s: %s' % (key, value)) req.deferred.callback('\r\n'.join(debuglines)+'\r\n') def _CC_terminate(self, req): try: call = self.factory.application.calls[req.callid] except KeyError: req.deferred.callback('Call id %s does not exist\r\n' % req.callid) else: self.factory.application.clean_call(req.callid) call.end(reason='admin', sendbye=True) req.deferred.callback('Ok\r\n') class CallControlFactory(Factory): protocol = CallControlProtocol def __init__(self, application): self.application = application class CallControlServer(object): def __init__(self): unlink(CallControlConfig.socket) self.path = CallControlConfig.socket self.group = CallControlConfig.group self.listening = None self.engines = None self.monitor = None self.calls = {} self.users = {} self._restore_calls() def clean_call(self, callid): try: call = self.calls[callid] except KeyError: return else: del self.calls[callid] user_calls = self.users.get(call.billingParty, []) try: user_calls.remove(callid) except ValueError: pass if not user_calls: self.users.pop(call.billingParty, None) self.engines.remove_user(call.billingParty) # log.debug('Call id %s removed from the list of controlled calls', callid) def run(self): reactor.addSystemEventTrigger('before', 'startup', self.on_startup) reactor.addSystemEventTrigger('before', 'shutdown', self.on_shutdown) reactor.run() def stop(self): reactor.stop() def on_startup(self): # First set up listening on the unix socket try: gid = grp.getgrnam(self.group)[2] mode = 0o660 except (KeyError, IndexError): gid = -1 mode = 0o666 self.listening = reactor.listenUNIX(address=self.path, factory=CallControlFactory(self)) # Make it writable only to the SIP proxy group members try: os.chown(self.path, -1, gid) os.chmod(self.path, mode) except OSError: log.warning("Couldn't set access rights for %s" % self.path) log.warning("OpenSIPS may not be able to communicate with us!") # Then setup the CallsMonitor self.monitor = CallsMonitor(CallControlConfig.checkInterval, self) # Open the connection to the rating engines self.engines = RatingEngineConnections() def on_shutdown(self): should_close = [] if self.listening is not None: self.listening.stopListening() if self.engines is not None: should_close.append(self.engines.shutdown()) if self.monitor is not None: self.monitor.shutdown() d = defer.DeferredList(should_close) d.addBoth(self._save_calls) return d def _save_calls(self, result): if self.calls: log.info('Saving calls') calls_file = process.runtime.file(backup_calls_file) try: - f = open(calls_file, 'w') + f = open(calls_file, 'wb') except: pass else: for call in list(self.calls.values()): call.application = None # we will mark timers with 'running' or 'idle', depending on their current state, # to be able to correctly restore them later (Timer objects cannot be pickled) if call.timer is not None: if call.inprogress: call.timer.cancel() call.timer = 'running' # temporary mark that this timer was running else: call.timer = 'idle' # temporary mark that this timer was not running failed_dump = False try: try: pickle.dump(self.calls, f) except Exception as e: log.warning('Failed to dump call list: %s', e) failed_dump = True finally: f.close() if failed_dump: unlink(calls_file) else: log.info("Saved calls: %s" % str(list(self.calls.keys()))) self.calls = {} def _restore_calls(self): calls_file = process.runtime.file(backup_calls_file) try: f = open(calls_file, 'r') except: pass else: try: self.calls = pickle.load(f) except Exception as e: log.warning('Failed to load calls saved in the previous session: %s', e) f.close() unlink(calls_file) if self.calls: log.info("Restoring calls saved previously: %s" % str(list(self.calls.keys()))) # the calls in the 2 sets below are never overlapping because closed and terminated # calls have different database fingerprints. so the dictionary update below is safe try: db = RadiusDatabase() try: terminated = db.query(RadiusDatabase.RadiusTask(None, 'terminated', calls=self.calls)) # calls terminated by caller/called didtimeout = db.query(RadiusDatabase.RadiusTask(None, 'timedout', calls=self.calls)) # calls closed by mediaproxy after a media timeout finally: db.close() except RadiusDatabaseError as e: log.error("Could not query database: %s" % e) else: for callid, call in list(self.calls.items()): callinfo = terminated.get(callid) or didtimeout.get(callid) if callinfo: # call already terminated or did timeout in mediaproxy del self.calls[callid] callinfo['call'] = call call.timer = None continue # close all calls that were already terminated or did timeout count = 0 for callinfo in list(terminated.values()): call = callinfo.get('call') if call is not None: call.end(calltime=callinfo['duration']) count += 1 for callinfo in list(didtimeout.values()): call = callinfo.get('call') if call is not None: call.end(sendbye=True) count += 1 if count > 0: log.info("Removed %d already terminated call%s" % (count, 's'*(count!=1))) for callid, call in list(self.calls.items()): call.application = self if call.timer == 'running': now = time.time() remain = call.starttime + call.timelimit - now if remain < 0: call.timelimit = int(round(now - call.starttime)) remain = 0 call._setup_timer(remain) call.timer.start() elif call.timer == 'idle': call._setup_timer() # also restore users table self.users.setdefault(call.billingParty, []).append(callid) class Request(object): """A request parsed into a structure based on request type""" __methods = {'init': ('callid', 'diverter', 'ruri', 'sourceip', 'from'), 'start': ('callid', 'dialogid'), 'stop': ('callid',), 'debug': ('show',), 'terminate': ('callid',)} def __init__(self, cmd, params): if cmd not in list(self.__methods.keys()): raise InvalidRequestError("Unknown request: %s" % cmd) try: parameters = dict(re.split(r':\s+', l, 1) for l in params) except ValueError: raise InvalidRequestError("Badly formatted request") for p in self.__methods[cmd]: try: parameters[p] except KeyError: raise InvalidRequestError("Missing %s from request" % p) self.cmd = cmd self.deferred = defer.Deferred() self.__dict__.update(parameters) try: getattr(self, '_RE_%s' % self.cmd)() except AttributeError: pass def _RE_init(self): self.from_ = self.__dict__['from'] if self.cmd=='init' and self.diverter.lower()=='none': self.diverter = None try: self.prepaid except AttributeError: self.prepaid = None else: if self.prepaid.lower() == 'true': self.prepaid = True elif self.prepaid.lower() == 'false': self.prepaid = False else: self.prepaid = None try: self.call_limit = int(self.call_limit) except (AttributeError, ValueError): self.call_limit = None else: if self.call_limit <= 0: self.call_limit = None try: self.call_token except AttributeError: self.call_token = None else: if not self.call_token or self.call_token.lower() == 'none': self.call_token = None try: self.sip_application except AttributeError: self.sip_application = '' def _RE_debug(self): if self.show == 'session': try: if not self.callid: raise InvalidRequestError("Missing callid from request") except AttributeError: raise InvalidRequestError("Missing callid from request") elif self.show == 'sessions': try: self.user except AttributeError: self.user = None else: raise InvalidRequestError("Illegal value for 'show' attribute in request") def __str__(self): if self.cmd == 'init': return "%(cmd)s: callid=%(callid)s from=%(from_)s ruri=%(ruri)s diverter=%(diverter)s sourceip=%(sourceip)s prepaid=%(prepaid)s call_limit=%(call_limit)s" % self.__dict__ elif self.cmd == 'start': return "%(cmd)s: callid=%(callid)s dialogid=%(dialogid)s" % self.__dict__ elif self.cmd == 'stop': return "%(cmd)s: callid=%(callid)s" % self.__dict__ elif self.cmd == 'debug': return "%(cmd)s: show=%(show)s" % self.__dict__ elif self.cmd == 'terminate': return "%(cmd)s: callid=%(callid)s" % self.__dict__ else: return object.__str__(self) diff --git a/callcontrol/opensips.py b/callcontrol/opensips.py index f906e62..2615607 100644 --- a/callcontrol/opensips.py +++ b/callcontrol/opensips.py @@ -1,245 +1,247 @@ import json import socket -import urllib.parse +from urllib.parse import urlparse from abc import ABCMeta, abstractmethod, abstractproperty from application import log from application.configuration import ConfigSection from application.python.types import Singleton from application.process import process from application.system import unlink from random import getrandbits from twisted.internet import reactor, defer from twisted.internet.protocol import DatagramProtocol from twisted.python.failure import Failure from callcontrol import configuration_file class OpenSIPSConfig(ConfigSection): __cfgfile__ = configuration_file __section__ = 'OpenSIPS' socket_path = '/run/opensips/socket' location_table = 'location' class Error(Exception): pass class TimeoutError(Error): pass class OpenSIPSError(Error): pass class NegativeReplyError(OpenSIPSError): def __init__(self, code, message): super(NegativeReplyError, self).__init__(code, message) self.code = code self.message = message def __repr__(self): return '{0.__class__.__name__}({0.code!r}, {0.message!r})'.format(self) def __str__(self): return '[{0.code}] {0.message}'.format(self) -class Request(object, metaclass=ABCMeta): +class Request(object): + __metaclass__ = ABCMeta method = abstractproperty() @abstractmethod def __init__(self, *args): self.id = '{:x}'.format(getrandbits(32)) self.args = list(args) self.deferred = defer.Deferred() @property def __data__(self): return dict(jsonrpc='2.0', id=self.id, method=self.method, params=self.args) @abstractmethod def process_response(self, response): raise NotImplementedError # noinspection PyAbstractClass class BooleanRequest(Request): """A request that returns True if successful, False otherwise""" def process_response(self, response): return not isinstance(response, Failure) class AddressReload(BooleanRequest): method = 'address_reload' def __init__(self): super(AddressReload, self).__init__() class DomainReload(BooleanRequest): method = 'domain_reload' def __init__(self): super(DomainReload, self).__init__() class EndDialog(BooleanRequest): method = 'dlg_end_dlg' def __init__(self, dialog_id): super(EndDialog, self).__init__(dialog_id) class RefreshWatchers(BooleanRequest): method = 'refresh_watchers' def __init__(self, account, refresh_type): super(RefreshWatchers, self).__init__('sip:{}'.format(account), 'presence', refresh_type) class UpdateSubscriptions(BooleanRequest): method = 'rls_update_subscriptions' def __init__(self, account): super(UpdateSubscriptions, self).__init__('sip:{}'.format(account)) class GetOnlineDevices(Request): method = 'ul_show_contact' def __init__(self, account): super(GetOnlineDevices, self).__init__(OpenSIPSConfig.location_table, account) def process_response(self, response): if isinstance(response, Failure): if response.type is NegativeReplyError and response.value.code == 404: return [] return response return [ContactData(contact) for contact in response['Contacts']] class ContactData(dict): __fields__ = {'contact', 'expires', 'received', 'user_agent'} def __init__(self, data): super(ContactData, self).__init__({key: value for key, value in ((key.lower().replace('-', '_'), value) for key, value in data.items()) if key in self.__fields__}) self.setdefault('user_agent', None) if 'received' in self: - parsed_received = urllib.parse.parse_qs(self['received']) + parsed_received = urlparse.parse_qs(self['received']) if 'target' in parsed_received: self['NAT_contact'] = parsed_received['target'][0] else: self['NAT_contact'] = self['received'] del self['received'] else: self['NAT_contact'] = self['contact'] class UNIXSocketProtocol(DatagramProtocol): noisy = False def datagramReceived(self, data, address): log.debug('Got MI response: {}'.format(data)) try: response = json.loads(data) except ValueError: code, _, message = data.partition(' ') try: code = int(code) except ValueError: log.error('MI response from OpenSIPS cannot be parsed (neither JSON nor status reply)') return # we got one of the 'code message' type of replies. This means either parsing error or internal error in OpenSIPS. # if we only have one request pending, we can associate the response with it, otherwise is impossible to tell to # which request the response corresponds. The failed request will fail with timeout later. if len(self.transport.requests) == 1: _, request = self.transport.requests.popitem() request.deferred.errback(Failure(NegativeReplyError(code, message))) log.error('MI request {.method} failed with: {} {}'.format(request, code, message)) else: log.error('Got MI status reply from OpenSIPS that cannot be associated with a request: {!r}'.format(data)) else: try: request_id = response['id'] except KeyError: log.error('MI JSON response from OpenSIPS lacks id field') return if request_id not in self.transport.requests: log.error('MI JSON response from OpenSIPS has unknown id: {!r}'.format(request_id)) return request = self.transport.requests.pop(request_id) if 'result' in response: request.deferred.callback(response['result']) elif 'error' in response: log.error('MI request {0.method} failed with: {1[error][code]} {1[error][message]}'.format(request, response)) request.deferred.errback(Failure(NegativeReplyError(response['error']['code'], response['error']['message']))) else: log.error('Invalid MI JSON response from OpenSIPS') request.deferred.errback(Failure(OpenSIPSError('Invalid MI JSON response from OpenSIPS'))) class UNIXSocketConnection(object): timeout = 3 def __init__(self): socket_path = process.runtime.file('opensips.sock') unlink(socket_path) self.path = socket_path self.transport = reactor.listenUNIXDatagram(self.path, UNIXSocketProtocol()) self.transport.requests = {} reactor.addSystemEventTrigger('during', 'shutdown', self.close) def close(self): for request in list(self.transport.requests.values()): if not request.deferred.called: request.deferred.errback(Error('shutting down')) self.transport.requests.clear() self.transport.stopListening() unlink(self.path) def send(self, request): try: self.transport.write(json.dumps(request.__data__), OpenSIPSConfig.socket_path) except socket.error as e: log.error("cannot write request to %s: %s" % (OpenSIPSConfig.socket_path, e[1])) request.deferred.errback(Failure(Error("Cannot send MI request %s to OpenSIPS" % request.method))) else: self.transport.requests[request.id] = request request.deferred.addBoth(request.process_response) reactor.callLater(self.timeout, self._did_timeout, request) log.debug('Send MI request: {}'.format(request.__data__)) return request.deferred def _did_timeout(self, request): if not request.deferred.called: request.deferred.errback(Failure(TimeoutError("OpenSIPS command did timeout"))) self.transport.requests.pop(request.id) -class ManagementInterface(object, metaclass=Singleton): +class ManagementInterface(object): + __metaclass__ = Singleton def __init__(self): self.connection = UNIXSocketConnection() def reload_domains(self): return self.connection.send(DomainReload()) def reload_addresses(self): return self.connection.send(AddressReload()) def end_dialog(self, dialog_id): return self.connection.send(EndDialog(dialog_id)) def get_online_devices(self, account): return self.connection.send(GetOnlineDevices(account)) def refresh_watchers(self, account, refresh_type): return self.connection.send(RefreshWatchers(account, refresh_type)) def update_subscriptions(self, account): return self.connection.send(UpdateSubscriptions(account)) diff --git a/callcontrol/rating/__init__.py b/callcontrol/rating/__init__.py index 7c33653..224f786 100644 --- a/callcontrol/rating/__init__.py +++ b/callcontrol/rating/__init__.py @@ -1,348 +1,385 @@ """Rating engine interface implementation.""" import random from collections import deque from application.configuration import ConfigSection, ConfigSetting from application.configuration.datatypes import EndpointAddress from application import log from application.python.types import Singleton from twisted.internet.protocol import ReconnectingClientFactory from twisted.internet.error import TimeoutError -from twisted.internet import reactor, defer -from twisted.protocols.basic import LineOnlyReceiver +from twisted.internet import reactor, defer, protocol +#from twisted.protocols.basic import LineOnlyReceiver from twisted.python import failure from callcontrol import configuration_file ## ## Rating engine configuration ## class ThorNodeConfig(ConfigSection): __cfgfile__ = configuration_file __section__ = 'ThorNetwork' enabled = False class RatingConfig(ConfigSection): __cfgfile__ = configuration_file __section__ = 'CDRTool' timeout = 500 class TimeLimit(int): """A positive time limit (in seconds) or None""" def __new__(typ, value): if value.lower() == 'none': return None try: limit = int(value) except: raise ValueError("invalid time limit value: %r" % value) if limit < 0: raise ValueError("invalid time limit value: %r. should be positive." % value) return limit class CallControlConfig(ConfigSection): __cfgfile__ = configuration_file __section__ = 'CallControl' prepaid_limit = ConfigSetting(type=TimeLimit, value=None) limit = ConfigSetting(type=TimeLimit, value=None) class RatingError(Exception): pass class RatingEngineError(RatingError): pass class RatingEngineTimeoutError(TimeoutError): pass -class RatingRequest(str): +# class RatingRequest(str): +class RatingRequest(bytes): + """ def __init__(self, command, reliable=True, **kwargs): - self.command = command + self.command = command.encode() self.reliable = reliable self.kwargs = kwargs self.deferred = defer.Deferred() def __new__(cls, command, reliable=True, **kwargs): reqstr = command + (kwargs and (' ' + ' '.join("%s=%s" % (name, value) for name, value in list(kwargs.items()))) or '') obj = str.__new__(cls, reqstr) + obj = super().__new__(cls, reqstr.encode('utf-8')) + print('RatingRequest object type: ', type(obj)) + return obj + """ + def __new__(cls, command, reliable=True, **kwargs): + reqstr = command + (kwargs and (' ' + ' '.join("%s=%s" % (name, value) for name, value in list(kwargs.items()))) or '') + obj = super().__new__(cls, reqstr.encode('utf-8')) + obj.command = command + obj.reliable = reliable + obj.kwargs = kwargs + obj.deferred = defer.Deferred() return obj + def __init__(self, *args, **kwargs): + super(RatingRequest, self).__init__() -class RatingEngineProtocol(LineOnlyReceiver): - delimiter = '\n\n' + +#class RatingEngineProtocol(LineOnlyReceiver): +class RatingEngineProtocol(protocol.Protocol): +# delimiter = '\n\n' + delimiter = b'\r\n' def __init__(self): self.connected = False self.__request = None self.__timeout_call = None self._request_queue = deque() def connectionMade(self): log.info("Connected to Rating Engine at %s:%d" % (self.transport.getPeer().host, self.transport.getPeer().port)) self.connected = True self.factory.application.connectionMade(self.transport.connector) if self._request_queue: self._send_next_request() def connectionLost(self, reason=None): log.info("Disconnected from Rating Engine at %s:%d" % (self.transport.getPeer().host, self.transport.getPeer().port)) self.connected = False if self.__request is not None: if self.__request.reliable: self._request_queue.appendleft(self.__request) self.__request = None else: self._respond("Connection with the Rating Engine is down: %s" % reason, success=False) self.factory.application.connectionLost(self.transport.connector, reason, self) - def timeoutConnection(self): - log.info("Connection to Rating Engine at %s:%d timedout" % (self.transport.getPeer().host, self.transport.getPeer().port)) - self.transport.loseConnection() +# def timeoutConnection(self): +# log.info("Connection to Rating Engine at %s:%d timed out" % (self.transport.getPeer().host, self.transport.getPeer().port)) +# self.transport.loseConnection() + + def dataReceived(self, data): + #log.debug('Rating response from %s:%S: %s' % (self.transport.getPeer().host, self.transport.getPeer().port, data)) + self.lineReceived(data.decode()) def lineReceived(self, line): - # log.debug('Got reply from rating engine: %s', line) + log.debug('Received response from rating engine %s:%s: %s' % (self.transport.getPeer().host, self.transport.getPeer().port, line.strip().replace("\n", " "))) if not line: return if self.__timeout_call is not None: self.__timeout_call.cancel() if self.__request is None: log.warning('Got reply for non-existing request: %s' % line) return try: self._respond(getattr(self, '_PE_%s' % self.__request.command.lower())(line)) except AttributeError: self._respond("Unknown command in request. Cannot handle reply. Reply is: %s" % line, success=False) except Exception as e: self._respond(str(e), success=False) def _PE_maxsessiontime(self, line): +# import pdb; pdb.set_trace() lines = line.splitlines() try: limit = lines[0].strip().capitalize() except IndexError: raise ValueError("Empty reply from rating engine") try: limit = int(limit) except: if limit == 'None': limit = None elif limit == 'Locked': pass else: raise ValueError("rating engine limit must be a positive number, None or Locked: got '%s' from %s:%s" % (limit, self.transport.getPeer().host, self.transport.getPeer().port)) else: if limit < 0: raise ValueError("rating engine limit must be a positive number, None or Locked: got '%s' from %s:%s" % (limit, self.transport.getPeer().host, self.transport.getPeer().port)) - info = dict(line.split('=', 1) for line in lines[1:]) + data_list = [line.split('=', 1) for line in lines[1:]] + info = {} + for elem in data_list: + try: + info[elem[0]] = elem[1] + except IndexError: + pass + if 'type' in info: type = info['type'].lower() if type == 'prepaid': prepaid = True elif type == 'postpaid': prepaid = False else: raise ValueError("prepaid must be either True or False: got '%s' from %s:%s" % (type, self.transport.getPeer().host, self.transport.getPeer().port)) else: prepaid = limit is not None return limit, prepaid def _PE_debitbalance(self, line): +# import pdb; pdb.set_trace() valid_answers = ('Ok', 'Failed', 'Not prepaid') lines = line.splitlines() try: result = lines[0].strip().capitalize() except IndexError: raise ValueError("Empty reply from rating engine %s:%s", (self.transport.getPeer().host, self.transport.getPeer().port)) if result not in valid_answers: log.error("Invalid reply from rating engine: got '%s' from %s:%s" % (lines[0].strip(), self.transport.getPeer().host, self.transport.getPeer().port)) log.warning('Rating engine possible failed query: %s', self.__request) raise RatingEngineError('Invalid rating engine response') elif result == 'Failed': log.warning('Rating engine failed query: %s', self.__request) raise RatingEngineError('Rating engine failed query') else: try: timelimit = int(lines[1].split('=', 1)[1].strip()) totalcost = lines[2].strip() except: log.error("Invalid reply from rating engine for DebitBalance on lines 2, 3: got '%s' from %s:%s" % ("', `".join(lines[1:3]), self.transport.getPeer().host, self.transport.getPeer().port)) timelimit = None totalcost = 0 return timelimit, totalcost def _send_next_request(self): if self.connected: self.__request = self._request_queue.popleft() - self.sendLine(self.__request) - self._set_timeout() - # log.debug('Sent request to rating engine: %s', self.__request) + self.delimiter = b'\r\n' + log.debug("Send request to rating engine %s:%s: %s" % (self.transport.getPeer().host, self.transport.getPeer().port, self.__request.decode())) +# self.sendLine(self.__request) + self.transport.write(self.__request) + #self._set_timeout() + self._set_timeout(self.factory.timeout) + log.debug('Sent request to rating engine: %s', self.__request.decode()) else: self.__request = None def _respond(self, result, success=True): if self.__request is not None: req = self.__request self.__request = None try: if success: req.deferred.callback(result) else: req.deferred.errback(failure.Failure(RatingEngineError(result))) except defer.AlreadyCalledError: log.debug('Request %s was already responded to', req) if self._request_queue: self._send_next_request() def _set_timeout(self, timeout=None): if timeout is None: timeout = self.factory.timeout - self.__timeout_call = reactor.callLater(timeout/1000.0, self.timeoutConnection) +# self.__timeout_call = reactor.callLater(timeout/1000, self.timeoutConnection) def send_request(self, request): if not request.reliable and not self.connected: request.deferred.errback(failure.Failure(RatingEngineError("Connection with the Rating Engine is down"))) return request self._request_queue.append(request) if self.__request is None: self._send_next_request() return request class RatingEngineFactory(ReconnectingClientFactory): protocol = RatingEngineProtocol timeout = RatingConfig.timeout # reconnect parameters maxDelay = 15 factor = maxDelay initialDelay = 1.0/factor delay = initialDelay def __init__(self, application): self.application = application def buildProtocol(self, addr): self.resetDelay() return ReconnectingClientFactory.buildProtocol(self, addr) def clientConnectionFailed(self, connector, reason): if self.application.disconnecting: return ReconnectingClientFactory.clientConnectionFailed(self, connector, reason) def clientConnectionLost(self, connector, reason): if self.application.disconnecting: return ReconnectingClientFactory.clientConnectionLost(self, connector, reason) class DummyRatingEngine(object): def getCallLimit(self, call, max_duration=CallControlConfig.prepaid_limit, reliable=True): return defer.fail(failure.Failure(RatingEngineError("Connection with the Rating Engine not yet established"))) def debitBalance(self, call, reliable=True): return defer.fail(failure.Failure(RatingEngineError("Connection with the Rating Engine not yet established"))) class RatingEngine(object): def __init__(self, address): self.address = address self.disconnecting = False self.connector = reactor.connectTCP(self.address[0], self.address[1], factory=RatingEngineFactory(self)) self.connection = None self.__unsent_req = deque() def shutdown(self): self.disconnecting = True self.connector.disconnect() def connectionMade(self, connector): self.connection = connector.transport self.connection.protocol._request_queue.extend(self.__unsent_req) for req in self.__unsent_req: log.debug('Re-queueing request for the rating engine: %s', req) self.__unsent_req.clear() def connectionLost(self, connector, reason, protocol): while protocol._request_queue: req = protocol._request_queue.pop() if not req.reliable: log.debug('Request is considered failed: %s', req) req.deferred.errback(failure.Failure(RatingEngineError("Connection with the Rating Engine is down"))) else: log.debug('Saving request to be requeued later: %s', req) self.__unsent_req.appendleft(req) self.connection = None def getCallLimit(self, call, max_duration=CallControlConfig.prepaid_limit, reliable=True): + #import pdb; pdb.set_trace() max_duration = max_duration or CallControlConfig.limit or 36000 args = {} if call.inprogress: args['State'] = 'Connected' req = RatingRequest('MaxSessionTime', reliable=reliable, CallId=call.callid, From=call.billingParty, To=call.ruri, Gateway=call.sourceip, Duration=max_duration, Application=call.sip_application, **args) if self.connection is not None: return self.connection.protocol.send_request(req).deferred else: self.__unsent_req.append(req) return req.deferred def debitBalance(self, call, reliable=True): req = RatingRequest('DebitBalance', reliable=reliable, CallId=call.callid, From=call.billingParty, To=call.ruri, Gateway=call.sourceip, Duration=call.duration, Application=call.sip_application) if self.connection is not None: return self.connection.protocol.send_request(req).deferred else: self.__unsent_req.append(req) return req.deferred class RatingEngineAddress(EndpointAddress): default_port = 9024 name = 'rating engine address' -class RatingEngineConnections(object, metaclass=Singleton): +class RatingEngineConnections(object): + __metaclass__ = Singleton def __init__(self): self.user_connections = {} if not ThorNodeConfig.enabled: from callcontrol.rating.backends.opensips import OpensipsBackend self.backend = OpensipsBackend() else: from callcontrol.rating.backends.sipthor import SipthorBackend self.backend = SipthorBackend() @staticmethod def getConnection(call=None): engines = RatingEngineConnections() try: conn = random.choice(engines.backend.connections) except IndexError: return DummyRatingEngine() if call is None: return conn return engines.user_connections.setdefault(call.billingParty, conn) def remove_user(self, user): try: del self.user_connections[user] except KeyError: pass def shutdown(self): return defer.maybeDeferred(self.backend.shutdown) diff --git a/callcontrol/rating/backends/sipthor.py b/callcontrol/rating/backends/sipthor.py index 7b812a0..a9a4df0 100644 --- a/callcontrol/rating/backends/sipthor.py +++ b/callcontrol/rating/backends/sipthor.py @@ -1,128 +1,158 @@ + from application.version import Version from application.configuration import ConfigSection, ConfigSetting from application.system import host from application import log from application.python.types import Singleton from gnutls.interfaces.twisted import TLSContext, X509Credentials from thor import __version__ as thor_version from thor.eventservice import EventServiceClient, ThorEvent from thor.entities import ThorEntitiesRoleMap, GenericThorEntity as ThorEntity from callcontrol.tls import Certificate, PrivateKey from callcontrol.rating import RatingEngine, RatingEngineAddress from callcontrol import configuration_file, __version__ from twisted.internet import defer, reactor if Version.parse(thor_version) < Version.parse('1.1.21'): raise RuntimeError('Thor version is smaller than 1.1.21 (%s)' % thor_version) class ThorNodeConfig(ConfigSection): __cfgfile__ = configuration_file __section__ = 'ThorNetwork' enabled = False domain = "sipthor.net" multiply = 1000 certificate = ConfigSetting(type=Certificate, value=None) private_key = ConfigSetting(type=PrivateKey, value=None) ca = ConfigSetting(type=Certificate, value=None) class CallcontrolNode(EventServiceClient, metaclass=Singleton): topics = ["Thor.Members"] def __init__(self): self.node = ThorEntity(host.default_ip, ['call_control'], version=__version__) self.networks = {} self.rating_connections = {} self.presence_message = ThorEvent('Thor.Presence', self.node.id) self.shutdown_message = ThorEvent('Thor.Leave', self.node.id) credentials = X509Credentials(ThorNodeConfig.certificate, ThorNodeConfig.private_key, [ThorNodeConfig.ca]) credentials.verify_peer = True tls_context = TLSContext(credentials) EventServiceClient.__init__(self, ThorNodeConfig.domain, tls_context) def publish(self, event): self._publish(event) def stop(self): return self._shutdown() def connectionLost(self, connector, reason): """Called when an event server connection goes away""" self.connections.discard(connector.transport) def connectionFailed(self, connector, reason): """Called when an event server connection has an unrecoverable error""" connector.failed = True def _disconnect_all(self, result): for conn in self.connectors: conn.disconnect() def _shutdown(self): if self.disconnecting: return self.disconnecting = True self.dns_monitor.cancel() if self.advertiser: self.advertiser.cancel() if self.shutdown_message: self._publish(self.shutdown_message) requests = [conn.protocol.unsubscribe(*self.topics) for conn in self.connections] d = defer.DeferredList([request.deferred for request in requests]) d.addCallback(self._disconnect_all) return d def handle_event(self, event): reactor.callFromThread(self._handle_event, event) def _handle_event(self, event): networks = self.networks role_map = ThorEntitiesRoleMap(event.message) # mapping between role names and lists of nodes with that role role = 'rating_server' try: network = networks[role] except KeyError: from thor import network as thor_network network = thor_network.new(ThorNodeConfig.multiply) networks[role] = network - new_nodes = set([node.ip for node in role_map.get(role, [])]) - old_nodes = set(network.nodes) + else: + first_run = False + ips = [] + for node in role_map.get(role, []): + if isinstance(node.ip, bytes): + ips.append(node.ip.decode('utf-8')) + else: + ips.append(node.ip) + nodes = [] + for node in network.nodes: + if isinstance(node, bytes): + nodes.append(node.decode('utf-8')) + else: + nodes.append(node) + new_nodes = set(ips) + old_nodes = set(nodes) + # old_nodes = set(network.nodes) added_nodes = new_nodes - old_nodes removed_nodes = old_nodes - new_nodes if added_nodes: + log.debug('added nodes: %s', added_nodes) for node in added_nodes: - network.add_node(node) - address = RatingEngineAddress(node) + if isinstance(node, str): + network.add_node(node.encode()) + address = RatingEngineAddress(node) + else: + network.add_node(node) + address = RatingEngineAddress(node.decode()) self.rating_connections[address] = RatingEngine(address) plural = 's' if len(added_nodes) != 1 else '' - log.info('Added rating node%s: %s', plural, ', '.join(added_nodes)) +# log.info('Added rating node%s: %s', plural, ', '.join(added_nodes)) + added_nodes_str = [node for node in added_nodes] + log.info("added %s node%s: %s" % (role, plural, ', '.join(added_nodes_str))) if removed_nodes: + log.debug('removed nodes: %s', removed_nodes) for node in removed_nodes: - network.remove_node(node) - address = RatingEngineAddress(node) + if isinstance(node, str): + network.remove_node(node.encode()) + address = RatingEngineAddress(node) + else: + network.remove_node(node) + address = RatingEngineAddress(node.decode()) self.rating_connections[address].shutdown() del self.rating_connections[address] plural = 's' if len(removed_nodes) != 1 else '' - log.info('Removed rating node%s: %s', plural, ', '.join(removed_nodes)) +# log.info('Removed rating node%s: %s', plural, ', '.join(removed_nodes)) + removed_nodes_str = [node for node in removed_nodes] + log.info("removed %s node%s: %s" % (role, plural, ', '.join(removed_nodes_str))) class SipthorBackend(object): def __init__(self): self.node = CallcontrolNode() @property def connections(self): return list(self.node.rating_connections.values()) def shutdown(self): for connection in self.connections: connection.shutdown() return self.node.stop() diff --git a/callcontrol/scheduler.py b/callcontrol/scheduler.py index 35714fb..2519901 100644 --- a/callcontrol/scheduler.py +++ b/callcontrol/scheduler.py @@ -1,46 +1,46 @@ """Schedule calls on a twisted reactor""" __all__ = ['RecurrentCall', 'KeepRunning'] from time import time class KeepRunning: """Return this class from a recurrent function to indicate that it should keep running""" pass class RecurrentCall(object): """Execute a function repeatedly at the given interval, until signaled to stop""" def __init__(self, period, func, *args, **kwargs): from twisted.internet import reactor self.func = func self.args = args self.kwargs = kwargs self.period = period self.now = None self.next = None self.callid = reactor.callLater(period, self) def __call__(self): from twisted.internet import reactor self.callid = None if self.now is None: self.now = time() self.next = self.now + self.period else: self.now, self.next = self.next, self.next + self.period result = self.func(*self.args, **self.kwargs) if result is KeepRunning: - delay = max(self.__next__-time(), 0) + delay = max(self.next - time(), 0) self.callid = reactor.callLater(delay, self) def cancel(self): if self.callid is not None: try: self.callid.cancel() except ValueError: pass self.callid = None diff --git a/callcontrol/sip.py b/callcontrol/sip.py index 2273ea2..76912fd 100644 --- a/callcontrol/sip.py +++ b/callcontrol/sip.py @@ -1,327 +1,330 @@ """ Implementation of Call objects used to store call information and manage a call. """ import time import re from application import log from twisted.internet.error import AlreadyCalled from twisted.internet import reactor, defer from callcontrol.rating import RatingEngineConnections from callcontrol.opensips import ManagementInterface class CallError(Exception): pass ## ## Call data types ## class ReactorTimer(object): def __init__(self, delay, function, args=[], kwargs={}): self.calldelay = delay self.function = function self.args = args self.kwargs = kwargs self.dcall = None def start(self): if self.dcall is None: self.dcall = reactor.callLater(self.calldelay, self.function, *self.args, **self.kwargs) def cancel(self): if self.dcall is not None: try: self.dcall.cancel() except AlreadyCalled: self.dcall = None def delay(self, seconds): if self.dcall is not None: try: self.dcall.delay(seconds) except AlreadyCalled: self.dcall = None def reset(self, seconds): if self.dcall is not None: try: self.dcall.reset(seconds) except AlreadyCalled: self.dcall = None class Structure(dict): def __init__(self): dict.__init__(self) def __getitem__(self, key): elements = key.split('.') obj = self ## start with ourselves for e in elements: if not isinstance(obj, dict): raise TypeError("unsubscriptable object") obj = dict.__getitem__(obj, e) return obj def __setitem__(self, key, value): self.__dict__[key] = value dict.__setitem__(self, key, value) def __delitem__(self, key): dict.__delitem__(self, key) del self.__dict__[key] __setattr__ = __setitem__ def __delattr__(self, name): try: del self.__dict__[name] except KeyError: raise AttributeError("'%s' object has no attribute '%s'" % (self.__class__.__name__, name)) else: dict.__delitem__(self, name) def update(self, other): dict.update(self, other) for key, value in list(other.items()): self.__dict__[key] = value class Call(Structure): """Defines a call""" def __init__(self, request, application): Structure.__init__(self) self.prepaid = request.prepaid self.locked = False ## if the account is locked because another call is in progress self.expired = False ## if call did consume its timelimit before being terminated self.created = time.time() self.timer = None self.starttime = None self.endtime = None self.timelimit = None self.duration = 0 self.callid = request.callid self.dialogid = None self.diverter = request.diverter self.ruri = request.ruri self.sourceip = request.sourceip self.token = request.call_token self.sip_application = request.sip_application self['from'] = request.from_ ## from is a python keyword ## Determine who will pay for the call if self.diverter is not None: self.billingParty = 'sip:%s' % self.diverter self.user = self.diverter else: match = re.search(r'(?P
sip:(?P