diff --git a/callcontrol/opensips.py b/callcontrol/opensips.py index 1543210..e290c85 100644 --- a/callcontrol/opensips.py +++ b/callcontrol/opensips.py @@ -1,179 +1,249 @@ -"""The OpenSIPS Management Interface""" - - +import json import socket -from collections import deque -from twisted.internet import reactor, defer -from twisted.internet.protocol import DatagramProtocol -from twisted.internet.error import CannotListenError -from twisted.python.failure import Failure +import urlparse + +from abc import ABCMeta, abstractmethod, abstractproperty +from application import log from application.configuration import ConfigSection from application.python.types import Singleton from application.process import process from application.system import unlink -from application import log +from random import getrandbits +from twisted.internet import reactor, defer +from twisted.internet.protocol import DatagramProtocol +from twisted.python.failure import Failure from callcontrol import configuration_filename class OpenSIPSConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'OpenSIPS' + socket_path = '/var/run/opensips/socket' - max_connections = 10 + location_table = 'location' -class Error(Exception): pass -class CommandError(Error): pass -class TimeoutError(Error): pass -class NegativeReplyError(Error): pass +class Error(Exception): + pass -class DialogID(str): - def __new__(cls, did): - if did is None: - return None - try: - h_entry, h_id = did.split(':') - except: - log.error("invalid dialog_id value: `%s'" % did) - return None - instance = str.__new__(cls, did) - instance.h_entry = h_entry - instance.h_id = h_id - return instance +class TimeoutError(Error): + pass + + +class OpenSIPSError(Error): + pass + + +class NegativeReplyError(OpenSIPSError): + def __init__(self, code, message): + super(NegativeReplyError, self).__init__(code, message) + self.code = code + self.message = message + + def __repr__(self): + return '{0.__class__.__name__}({0.code!r}, {0.message!r})'.format(self) + + def __str__(self): + return '[{0.code}] {0.message}'.format(self) + class Request(object): - def __init__(self, command): - self.command = command + __metaclass__ = ABCMeta + + method = abstractproperty() + + @abstractmethod + def __init__(self, *args): + self.id = '{:x}'.format(getrandbits(32)) + self.args = list(args) self.deferred = defer.Deferred() + @property + def __data__(self): + return dict(jsonrpc='2.0', id=self.id, method=self.method, params=self.args) + + @abstractmethod + def process_response(self, response): + raise NotImplementedError + + +# noinspection PyAbstractClass +class BooleanRequest(Request): + """A request that returns True if successful, False otherwise""" + def process_response(self, response): + return not isinstance(response, Failure) + + +class AddressReload(BooleanRequest): + method = 'address_reload' + + def __init__(self): + super(AddressReload, self).__init__() + + +class DomainReload(BooleanRequest): + method = 'domain_reload' + + def __init__(self): + super(DomainReload, self).__init__() + + +class EndDialog(BooleanRequest): + method = 'dlg_end_dlg' + + def __init__(self, dialog_id): + super(EndDialog, self).__init__(dialog_id) + + +class RefreshWatchers(BooleanRequest): + method = 'refresh_watchers' + + def __init__(self, account, refresh_type): + super(RefreshWatchers, self).__init__('sip:{}'.format(account), 'presence', refresh_type) + + +class UpdateSubscriptions(BooleanRequest): + method = 'rls_update_subscriptions' + + def __init__(self, account): + super(UpdateSubscriptions, self).__init__('sip:{}'.format(account)) + + +class GetOnlineDevices(Request): + method = 'ul_show_contact' + + def __init__(self, account): + super(GetOnlineDevices, self).__init__(OpenSIPSConfig.location_table, account) + + def process_response(self, response): + if isinstance(response, Failure): + if response.type is NegativeReplyError and response.value.code == 404: + return [] + return response + return [ContactData(contact) for contact in response[u'Contacts']] + + +class ContactData(dict): + __fields__ = {u'contact', u'expires', u'received', u'user_agent'} + + def __init__(self, data): + super(ContactData, self).__init__({key: value for key, value in ((key.lower().replace(u'-', u'_'), value) for key, value in data.iteritems()) if key in self.__fields__}) + self.setdefault(u'user_agent', None) + if u'received' in self: + parsed_received = urlparse.parse_qs(self[u'received']) + if u'target' in parsed_received: + self[u'NAT_contact'] = parsed_received[u'target'][0] + else: + self[u'NAT_contact'] = self[u'received'] + del self[u'received'] + else: + self[u'NAT_contact'] = self[u'contact'] + class UNIXSocketProtocol(DatagramProtocol): noisy = False def datagramReceived(self, data, address): - deferred = self.transport.deferred - if deferred is None or deferred.called: - return - # accumulate in a buffer until message end (do this later when implemented by opensips) -Dan - if not data: - failure = Failure(CommandError("Empty reply from OpenSIPS")) - deferred.errback(failure) - return + log.debug('Got MI response: {}'.format(data)) try: - status, msg = data.split('\n', 1) + response = json.loads(data) except ValueError: - failure = Failure(CommandError("Missing line terminator after status line in OpenSIPS reply")) - deferred.errback(failure) - return - if status.upper() == '200 OK': - deferred.callback(msg) + code, _, message = data.partition(' ') + try: + code = int(code) + except ValueError: + log.error('Received un-parsable response from OpenSIPS: {!r}'.format(data)) + return + # we got one of the 'code message' type of replies. This means either parsing error or internal error in OpenSIPS. + # if we only have one request pending, we can associate the response with it, otherwise is impossible to tell to + # which request the response corresponds. The failed request will fail with timeout later. + if len(self.transport.requests) == 1: + _, request = self.transport.requests.popitem() + request.deferred.errback(Failure(NegativeReplyError(code, message))) + log.error('MI request {.method} failed with: {} {}'.format(request, code, message)) + else: + log.error('Got non-JSON error reply from OpenSIPS that cannot be associated with a request: {!r}'.format(data)) else: - deferred.errback(Failure(NegativeReplyError(status))) + try: + request_id = response['id'] + except KeyError: + log.error('MI JSON response from OpenSIPS lacks id field: {!r}'.format(response)) + return + if request_id not in self.transport.requests: + log.error('Received MI response from OpenSIPS with unknown id: {!r}'.format(response)) + return + request = self.transport.requests.pop(request_id) + if 'result' in response: + request.deferred.callback(response['result']) + elif 'error' in response: + log.error('MI request {0.method} failed with: {1[error][code]} {1[error][message]}'.format(request, response)) + request.deferred.errback(Failure(NegativeReplyError(response['error']['code'], response['error']['message']))) + else: + log.error('Got invalid MI response from OpenSIPS: {!r}'.format(response)) + request.deferred.errback(Failure(OpenSIPSError('Invalid response from OpenSIPS'))) class UNIXSocketConnection(object): timeout = 3 - def __init__(self, socket_path): - self._initialized = False + def __init__(self): + socket_path = process.runtime_file('opensips.sock') + unlink(socket_path) self.path = socket_path self.transport = reactor.listenUNIXDatagram(self.path, UNIXSocketProtocol()) + self.transport.requests = {} reactor.addSystemEventTrigger('during', 'shutdown', self.close) - self.transport.deferred = None ## placeholder for the deferred used by a request - self._initialized = True def close(self): - if self._initialized: - self.transport.stopListening() - unlink(self.path) - - def _get_deferred(self): - return self.transport.deferred - def _set_deferred(self, d): - self.transport.deferred = d - deferred = property(_get_deferred, _set_deferred) - - def _did_timeout(self, deferred): - if deferred.called: - return - deferred.errback(Failure(TimeoutError("OpenSIPS command did timeout"))) + for request in self.transport.requests.values(): + if not request.deferred.called: + request.deferred.errback(Error('shutting down')) + self.transport.requests.clear() + self.transport.stopListening() + unlink(self.path) def send(self, request): - self.deferred = request.deferred try: - self.transport.write(request.command, OpenSIPSConfig.socket_path) - except socket.error, why: - log.error("cannot write request to %s: %s" % (OpenSIPSConfig.socket_path, why[1])) - self.deferred.errback(Failure(CommandError("Cannot send request to OpenSIPS"))) + self.transport.write(json.dumps(request.__data__), OpenSIPSConfig.socket_path) + except socket.error as e: + log.error("cannot write request to %s: %s" % (OpenSIPSConfig.socket_path, e[1])) + request.deferred.errback(Failure(Error("Cannot send MI request %s to OpenSIPS" % request.method))) else: - reactor.callLater(self.timeout, self._did_timeout, self.deferred) - - -class UNIXSocketConnectionPool(object): - """Pool of UNIX socket connection to OpenSIPS""" - - def __init__(self, max_connections=10, pool_id=''): - assert max_connections > 0, 'maximum should be > 0' - self.max = max_connections - self.id = pool_id - self.workers = 0 - self.waiters = deque() - self.connections = deque() - - def _create_connections_as_needed(self): - while self.workers < self.max and len(self.waiters) > len(self.connections): - socket_name = "opensips_%s%02d.sock" % (self.id, self.workers+1) - socket_path = process.runtime_file(socket_name) - unlink(socket_path) - try: - conn = UNIXSocketConnection(socket_path) - except CannotListenError, why: - log.error("cannot create an OpenSIPS UNIX socket connection: %s" % str(why)) - break - self.connections.append(conn) - self.workers += 1 - - def _release_connection(self, result, conn): - self.connections.append(conn) - self._process_waiters() - return result - - def _process_waiters(self): - while self.waiters: - try: - conn = self.connections.popleft() - except IndexError: - return - request = self.waiters.popleft() - request.deferred.addBoth(self._release_connection, conn) - conn.send(request) - - def defer_to_connection(self, command): - request = Request(command) - self.waiters.append(request) - self._create_connections_as_needed() - self._process_waiters() + self.transport.requests[request.id] = request + request.deferred.addBoth(request.process_response) + reactor.callLater(self.timeout, self._did_timeout, request) + log.debug('Send MI request: {}'.format(request.__data__)) return request.deferred + def _did_timeout(self, request): + if not request.deferred.called: + request.deferred.errback(Failure(TimeoutError("OpenSIPS command did timeout"))) + self.transport.requests.pop(request.id) + class ManagementInterface(object): __metaclass__ = Singleton - + def __init__(self): - self.pool = UNIXSocketConnectionPool(OpenSIPSConfig.max_connections) + self.connection = UNIXSocketConnection() - ## Reply handlers __RH_xxx - - def __RH_end_dialog(self, result): - if isinstance(result, Failure): - log.error("failed to end dialog: %s" % result.value) - return False - return True + def reload_domains(self): + return self.connection.send(DomainReload()) + + def reload_addresses(self): + return self.connection.send(AddressReload()) def end_dialog(self, dialog_id): - cmd = ':dlg_end_dlg:\n%s\n%s\n\n' % (dialog_id.h_entry, dialog_id.h_id) - return self.pool.defer_to_connection(cmd).addBoth(self.__RH_end_dialog) + return self.connection.send(EndDialog(dialog_id)) + + def get_online_devices(self, account): + return self.connection.send(GetOnlineDevices(account)) + + def refresh_watchers(self, account, refresh_type): + return self.connection.send(RefreshWatchers(account, refresh_type)) + def update_subscriptions(self, account): + return self.connection.send(UpdateSubscriptions(account)) diff --git a/callcontrol/sip.py b/callcontrol/sip.py index eb1d175..8b3c529 100644 --- a/callcontrol/sip.py +++ b/callcontrol/sip.py @@ -1,324 +1,324 @@ """ Implementation of Call objects used to store call information and manage a call. """ import time import re from application import log from twisted.internet.error import AlreadyCalled from twisted.internet import reactor, defer from callcontrol.rating import RatingEngineConnections -from callcontrol.opensips import DialogID, ManagementInterface +from callcontrol.opensips import ManagementInterface class CallError(Exception): pass ## ## Call data types ## class ReactorTimer(object): def __init__(self, delay, function, args=[], kwargs={}): self.calldelay = delay self.function = function self.args = args self.kwargs = kwargs self.dcall = None def start(self): if self.dcall is None: self.dcall = reactor.callLater(self.calldelay, self.function, *self.args, **self.kwargs) def cancel(self): if self.dcall is not None: try: self.dcall.cancel() except AlreadyCalled: self.dcall = None def delay(self, seconds): if self.dcall is not None: try: self.dcall.delay(seconds) except AlreadyCalled: self.dcall = None def reset(self, seconds): if self.dcall is not None: try: self.dcall.reset(seconds) except AlreadyCalled: self.dcall = None class Structure(dict): def __init__(self): dict.__init__(self) def __getitem__(self, key): elements = key.split('.') obj = self ## start with ourselves for e in elements: if not isinstance(obj, dict): raise TypeError("unsubscriptable object") obj = dict.__getitem__(obj, e) return obj def __setitem__(self, key, value): self.__dict__[key] = value dict.__setitem__(self, key, value) def __delitem__(self, key): dict.__delitem__(self, key) del self.__dict__[key] __setattr__ = __setitem__ def __delattr__(self, name): try: del self.__dict__[name] except KeyError: raise AttributeError("'%s' object has no attribute '%s'" % (self.__class__.__name__, name)) else: dict.__delitem__(self, name) def update(self, other): dict.update(self, other) for key, value in other.items(): self.__dict__[key] = value class Call(Structure): """Defines a call""" def __init__(self, request, application): Structure.__init__(self) self.prepaid = request.prepaid self.locked = False ## if the account is locked because another call is in progress self.expired = False ## if call did consume its timelimit before being terminated self.created = time.time() self.timer = None self.starttime = None self.endtime = None self.timelimit = None self.duration = 0 self.callid = request.callid self.dialogid = None self.diverter = request.diverter self.ruri = request.ruri self.sourceip = request.sourceip self.token = request.call_token self.sip_application = request.sip_application self['from'] = request.from_ ## from is a python keyword ## Determine who will pay for the call if self.diverter is not None: self.billingParty = 'sip:%s' % self.diverter self.user = self.diverter else: match = re.search(r'(?P
sip:(?P