diff --git a/mediaproxy/dispatcher.py b/mediaproxy/dispatcher.py index 57af527..61a1e89 100644 --- a/mediaproxy/dispatcher.py +++ b/mediaproxy/dispatcher.py @@ -1,555 +1,600 @@ """Implementation of the MediaProxy dispatcher""" - +import hashlib import random import signal import cPickle as pickle import cjson +from base64 import b64encode as base64_encode from collections import deque from itertools import ifilter from time import time from application import log from application.process import process from application.system import unlink from gnutls.errors import CertificateSecurityError from gnutls.interfaces.twisted import TLSContext from twisted.protocols.basic import LineOnlyReceiver from twisted.python import failure from twisted.internet.error import ConnectionDone, TCPTimedOutError from twisted.internet.protocol import Factory, connectionDone from twisted.internet.defer import Deferred, DeferredList, maybeDeferred, succeed from twisted.internet import reactor from mediaproxy import __version__ from mediaproxy.configuration import DispatcherConfig from mediaproxy.interfaces import opensips from mediaproxy.scheduler import RecurrentCall, KeepRunning from mediaproxy.tls import X509Credentials class CommandError(Exception): pass class Command(object): def __init__(self, name, headers=None): self.name = name self.headers = headers or [] try: self.parsed_headers = dict(header.split(': ', 1) for header in self.headers) except Exception: raise CommandError('Could not parse command headers') + else: + self.__dict__['session_id'] = None if self.call_id is None else base64_encode(hashlib.md5(self.call_id).digest()).rstrip('=') @property def call_id(self): return self.parsed_headers.get('call_id') + @property + def dialog_id(self): + return self.parsed_headers.get('dialog_id') + + @property + def session_id(self): + return self.__dict__['session_id'] + + +class ProtocolLogger(log.ContextualLogger): + def __init__(self, name): + super(ProtocolLogger, self).__init__(logger=log.get_logger()) # use the main logger as backend + self.name = name + + def apply_context(self, message): + return '[{0}] {1}'.format(self.name, message) if message != '' else '' + + +class SessionLogger(log.ContextualLogger): + def __init__(self, session): + super(SessionLogger, self).__init__(logger=log.get_logger()) # use the main logger as backend + self.session_id = session.session_id + self.relay_ip = session.relay_ip + + def apply_context(self, message): + return '[session {0.session_id} at {0.relay_ip}] {1}'.format(self, message) if message != '' else '' + class ControlProtocol(LineOnlyReceiver): + logger = None # type: ProtocolLogger noisy = False def __init__(self): self.in_progress = 0 def lineReceived(self, line): raise NotImplementedError() def connectionLost(self, reason=connectionDone): if isinstance(reason.value, connectionDone.type): - log.debug('Connection to {} closed'.format(self.description)) + self.logger.info('Connection closed') else: - log.debug('Connection to {} lost: {}'.format(self.description, reason.value)) + self.logger.warning('Connection lost: {}'.format(reason.value)) self.factory.connection_lost(self) def reply(self, reply): self.transport.write(reply + self.delimiter) def _error_handler(self, failure): failure.trap(CommandError, RelayError) - log.error(failure.value) + self.logger.error(failure.value) self.reply('error') def _catch_all(self, failure): - log.error(failure.getTraceback()) + self.logger.error(failure.getTraceback()) self.reply('error') def _decrement(self, result): self.in_progress = 0 if self.factory.shutting_down: self.transport.loseConnection() def _add_callbacks(self, defer): defer.addCallback(self.reply) defer.addErrback(self._error_handler) defer.addErrback(self._catch_all) defer.addBoth(self._decrement) class OpenSIPSControlProtocol(ControlProtocol): - description = 'OpenSIPS' + logger = ProtocolLogger(name='OpenSIPS Interface') def __init__(self): self.request_lines = [] ControlProtocol.__init__(self) def lineReceived(self, line): if line == '': if self.request_lines: self.in_progress += 1 defer = maybeDeferred(self.handle_request, self.request_lines) self._add_callbacks(defer) self.request_lines = [] elif not line.endswith(': '): self.request_lines.append(line) def handle_request(self, request_lines): command = Command(name=request_lines[0], headers=request_lines[1:]) if command.call_id is None: - raise CommandError('Request from OpenSIPS is missing the call_id header') + raise CommandError('Request is missing the call_id header') return self.factory.dispatcher.send_command(command) class ManagementControlProtocol(ControlProtocol): - description = 'Management Interface' + logger = ProtocolLogger(name='Management Interface') def connectionMade(self): if DispatcherConfig.management_use_tls and DispatcherConfig.management_passport is not None: peer_cert = self.transport.getPeerCertificate() if not DispatcherConfig.management_passport.accept(peer_cert): self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted')) return def lineReceived(self, line): if line in ['quit', 'exit']: self.transport.loseConnection() elif line == 'summary': defer = self.factory.dispatcher.relay_factory.get_summary() self._add_callbacks(defer) elif line == 'sessions': defer = self.factory.dispatcher.relay_factory.get_statistics() self._add_callbacks(defer) elif line == 'version': self.reply(__version__) else: - log.error('Unknown command on management interface: %s' % line) + self.logger.error('Unknown command: %s' % line) self.reply('error') class ControlFactory(Factory): noisy = False def __init__(self, dispatcher): self.dispatcher = dispatcher self.protocols = [] self.shutting_down = False def buildProtocol(self, addr): protocol = Factory.buildProtocol(self, addr) self.protocols.append(protocol) return protocol def connection_lost(self, prot): self.protocols.remove(prot) if self.shutting_down and len(self.protocols) == 0: self.defer.callback(None) def shutdown(self): if self.shutting_down: return self.shutting_down = True if len(self.protocols) == 0: return succeed(None) else: for prot in self.protocols: if prot.in_progress == 0: prot.transport.loseConnection() self.defer = Deferred() return self.defer class OpenSIPSControlFactory(ControlFactory): protocol = OpenSIPSControlProtocol class ManagementControlFactory(ControlFactory): protocol = ManagementControlProtocol class RelayError(Exception): pass class ConnectionReplaced(ConnectionDone): pass class RelayServerProtocol(LineOnlyReceiver): + MAX_LENGTH = 4096*1024 # 4MB noisy = False - MAX_LENGTH = 4096*1024 ## (4MB) def __init__(self): + self.ip = None # type: str + self.logger = None # type: ProtocolLogger self.commands = {} self.halting = False self.timedout = False self.disconnect_timer = None self.sequence_number = 0 self.authenticated = False @property def active(self): return not self.halting and not self.timedout def send_command(self, command): - log.debug('Issuing %r command to relay at %s' % (command.name, self.ip)) + if command.call_id: + self.logger.info('Requesting {0.name!r} for session {0.session_id}'.format(command)) + else: + self.logger.info('Requesting {0.name!r}'.format(command)) sequence_number = str(self.sequence_number) self.sequence_number += 1 defer = Deferred() timer = reactor.callLater(DispatcherConfig.relay_timeout, self._timeout, sequence_number) self.commands[sequence_number] = (command, defer, timer) self.transport.write(self.delimiter.join(['{} {}'.format(command.name, sequence_number)] + command.headers) + 2*self.delimiter) return defer def reply(self, reply): self.transport.write(reply + self.delimiter) def _timeout(self, sequence_number): command, defer, timer = self.commands.pop(sequence_number) defer.errback(RelayError('%r command failed: relay at %s timed out' % (command.name, self.ip))) if self.timedout is False: self.timedout = True self.disconnect_timer = reactor.callLater(DispatcherConfig.relay_recover_interval, self.transport.connectionLost, failure.Failure(TCPTimedOutError())) def connectionMade(self): if DispatcherConfig.passport is not None: peer_cert = self.transport.getPeerCertificate() if not DispatcherConfig.passport.accept(peer_cert): self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted')) return self.authenticated = True self.factory.new_relay(self) def lineReceived(self, line): try: first, rest = line.split(' ', 1) except ValueError: first = line rest = '' if first == 'expired': try: stats = cjson.decode(rest) - except cjson.DecodeError: - log.error('Error decoding JSON from relay at %s' % self.ip) + except cjson.DecodeError as e: + self.logger.error('Could not decode JSON: {}'.format(e)) else: call_id = stats['call_id'] session = self.factory.sessions.get(call_id, None) if session is None: - log.error('Unknown session with call_id %s expired at relay %s' % (call_id, self.ip)) + self.logger.error('Expired session has unknown call_id %s' % call_id) return if session.relay_ip != self.ip: - log.error('session with call_id %s expired at relay %s, but is actually at relay %s, ignoring' % (call_id, self.ip, session.relay_ip)) + session.logger.error('relay at %s reported the session as expired, ignoring' % self.ip) return all_streams_ice = all(stream_info['status'] == 'unselected ICE candidate' for stream_info in stats['streams']) if all_streams_ice: - log.info('session with call_id %s from relay %s removed because ICE was used' % (call_id, session.relay_ip)) + session.logger.info('removed because ICE was used') stats['timed_out'] = False else: - log.info('session with call_id %s from relay %s did timeout' % (call_id, session.relay_ip)) + session.logger.info('did timeout') stats['timed_out'] = True stats['dialog_id'] = session.dialog_id stats['all_streams_ice'] = all_streams_ice - self.factory.dispatcher.update_statistics(stats) + self.factory.dispatcher.update_statistics(session, stats) if session.dialog_id is not None and stats['start_time'] is not None and not all_streams_ice: self.factory.dispatcher.opensips_management.end_dialog(session.dialog_id) session.expire_time = time() else: del self.factory.sessions[call_id] return elif first == 'ping': if self.timedout is True: self.timedout = False if self.disconnect_timer.active(): self.disconnect_timer.cancel() self.disconnect_timer = None self.reply('pong') return try: command, defer, timer = self.commands.pop(first) except KeyError: - log.error('Got unexpected response from relay at %s: %s' % (self.ip, line)) + self.logger.error('Got unexpected response: {}'.format(line)) return timer.cancel() if rest == 'error': - defer.errback(RelayError('Received error from relay at %s in response to %r command' % (self.ip, command.name))) + defer.errback(RelayError('Relay replied with error')) elif rest == 'halting': self.halting = True - defer.errback(RelayError('Relay at %s is shutting down' % self.ip)) + defer.errback(RelayError('Relay is shutting down')) elif command.name == 'remove': try: stats = cjson.decode(rest) except cjson.DecodeError: - log.error('Error decoding JSON from relay at %s' % self.ip) + self.logger.error('Error decoding JSON') else: call_id = stats['call_id'] session = self.factory.sessions[call_id] stats['dialog_id'] = session.dialog_id stats['timed_out'] = False - self.factory.dispatcher.update_statistics(stats) + self.factory.dispatcher.update_statistics(session, stats) del self.factory.sessions[call_id] defer.callback('removed') else: # update command defer.callback(rest) def connectionLost(self, reason=connectionDone): if reason.type == ConnectionDone: - log.info('Connection with relay at %s was closed' % self.ip) + self.logger.info('Connection closed') elif reason.type == ConnectionReplaced: - log.warning('Old connection with relay at %s was lost' % self.ip) + self.logger.warning('Connection replaced') else: - log.error('Connection with relay at %s was lost: %s' % (self.ip, reason.value)) + self.logger.error('Connection lost: {}'.format(reason.value)) for command, defer, timer in self.commands.itervalues(): timer.cancel() - defer.errback(RelayError('%r command failed: relay at %s disconnected' % (command.name, self.ip))) + defer.errback(RelayError('Relay disconnected')) if self.timedout is True: self.timedout = False if self.disconnect_timer.active(): self.disconnect_timer.cancel() self.disconnect_timer = None self.factory.connection_lost(self) class RelaySession(object): - def __init__(self, relay_ip, command): - self.relay_ip = relay_ip - self.dialog_id = command.parsed_headers.get('dialog_id') + def __init__(self, relay, command): + self.relay_ip = relay.ip + self.call_id = command.call_id + self.session_id = command.session_id + self.dialog_id = command.dialog_id + self.logger = SessionLogger(self) self.expire_time = None class RelayFactory(Factory): - noisy = False protocol = RelayServerProtocol + noisy = False def __init__(self, dispatcher): self.dispatcher = dispatcher self.relays = {} self.shutting_down = False state_file = process.runtime.file('dispatcher_state') try: self.sessions = pickle.load(open(state_file)) except: self.sessions = {} self.cleanup_timers = {} else: self.cleanup_timers = dict((ip, reactor.callLater(DispatcherConfig.cleanup_dead_relays_after, self._do_cleanup, ip)) for ip in set(session.relay_ip for session in self.sessions.itervalues())) unlink(state_file) self.expired_cleaner = RecurrentCall(600, self._remove_expired_sessions) def _remove_expired_sessions(self): now, limit = time(), DispatcherConfig.cleanup_expired_sessions_after obsolete = [k for k, s in ifilter(lambda (k, s): s.expire_time and (now-s.expire_time>=limit), self.sessions.iteritems())] if obsolete: [self.sessions.pop(call_id) for call_id in obsolete] log.warning('found %d expired sessions which were not removed during the last %d hours' % (len(obsolete), round(limit / 3600.0))) return KeepRunning def buildProtocol(self, addr): - ip = addr.host - log.debug('Connection from relay at %s' % ip) protocol = Factory.buildProtocol(self, addr) - protocol.ip = ip + protocol.ip = addr.host + protocol.logger = ProtocolLogger(name='relay {}'.format(addr.host)) + protocol.logger.info('Connection established') return protocol def new_relay(self, relay): old_relay = self.relays.pop(relay.ip, None) if old_relay is not None: - log.warning('Relay at %s reconnected, closing old connection' % relay.ip) - reactor.callLater(0, old_relay.transport.connectionLost, failure.Failure(ConnectionReplaced("relay reconnected"))) + relay.logger.warning('Reconnected, closing old connection') + reactor.callLater(0, old_relay.transport.connectionLost, failure.Failure(ConnectionReplaced('relay reconnected'))) self.relays[relay.ip] = relay timer = self.cleanup_timers.pop(relay.ip, None) if timer is not None: timer.cancel() defer = relay.send_command(Command('sessions')) defer.addCallback(self._cb_purge_sessions, relay.ip) def _cb_purge_sessions(self, result, relay_ip): relay_sessions = cjson.decode(result) relay_call_ids = [session['call_id'] for session in relay_sessions] for session_id, session in self.sessions.items(): if session.expire_time is None and session.relay_ip == relay_ip and session_id not in relay_call_ids: - log.warning('Session %s is no longer on relay %s, statistics are probably lost' % (session_id, relay_ip)) + session.logger.warning('Relay does not have the session anymore, statistics are probably lost') if session.dialog_id is not None: self.dispatcher.opensips_management.end_dialog(session.dialog_id) del self.sessions[session_id] def send_command(self, command): session = self.sessions.get(command.call_id, None) if session and session.expire_time is None: relay = session.relay_ip if relay not in self.relays: - raise RelayError('Relay for this session (%s) is no longer connected' % relay) + session.logger.error('Request {0.name!r} failed: relay no longer connected'.format(command)) + raise RelayError('Request {0.name!r} failed: relay no longer connected'.format(command)) return self.relays[relay].send_command(command) # We do not have a session for this call_id or the session is already expired if command.name == 'update': preferred_relay = command.parsed_headers.get('media_relay') try_relays = deque(protocol for protocol in self.relays.itervalues() if protocol.active and protocol.ip != preferred_relay) random.shuffle(try_relays) if preferred_relay is not None: protocol = self.relays.get(preferred_relay) if protocol is not None and protocol.active: try_relays.appendleft(protocol) else: log.warning('user requested media_relay %s is not available' % preferred_relay) defer = self._try_next(try_relays, command) defer.addCallback(self._add_session, try_relays, command) return defer elif command.name == 'remove' and session: # This is the remove we received for an expired session for which we triggered dialog termination del self.sessions[command.call_id] return 'removed' else: - raise RelayError('Got {0.name!r} command from OpenSIPS for unknown session with call-id {0.call_id}'.format(command)) + raise RelayError('Got {0.name!r} for unknown session {0.session_id}'.format(command)) def _add_session(self, result, try_relays, command): - self.sessions[command.call_id] = RelaySession(try_relays[0].ip, command) + self.sessions[command.call_id] = RelaySession(try_relays[0], command) return result def _relay_error(self, failure, try_relays, command): failure.trap(RelayError) failed_relay = try_relays.popleft() - log.warning('Relay %s failed: %s' % (failed_relay, failure.value)) + failed_relay.logger.warning('The {0.name!r} request failed: {1.value}'.format(command, failure)) return self._try_next(try_relays, command) def _try_next(self, try_relays, command): if len(try_relays) == 0: raise RelayError('No suitable relay found') defer = try_relays[0].send_command(command) defer.addErrback(self._relay_error, try_relays, command) return defer def get_summary(self): - defer = DeferredList([relay.send_command(Command('summary')).addErrback(self._summary_error, ip) for ip, relay in self.relays.iteritems()]) + command = Command('summary') + defer = DeferredList([relay.send_command(command).addErrback(self._summary_error, command, relay) for relay in self.relays.itervalues()]) defer.addCallback(self._got_summaries) return defer - def _summary_error(self, failure, ip): - log.error('Error processing query at relay %s: %s' % (ip, failure.value)) - return cjson.encode(dict(status="error", ip=ip)) + def _summary_error(self, failure, command, relay): + relay.logger.error('The {0.name!r} request failed: {1.value}'.format(command, failure)) + return cjson.encode(dict(status='error', ip=relay.ip)) def _got_summaries(self, results): return '[%s]' % ', '.join(result for succeeded, result in results if succeeded) def get_statistics(self): - defer = DeferredList([relay.send_command(Command('sessions')) for relay in self.relays.itervalues()]) + command = Command('sessions') + defer = DeferredList([relay.send_command(command).addErrback(self._statistics_error, command, relay) for relay in self.relays.itervalues()]) defer.addCallback(self._got_statistics) return defer + def _statistics_error(self, failure, command, relay): + relay.logger.error('The {0.name!r} request failed: {1.value}'.format(command, failure)) + return cjson.encode([]) + def _got_statistics(self, results): return '[%s]' % ', '.join(result[1:-1] for succeeded, result in results if succeeded and result != '[]') def connection_lost(self, relay): if relay not in self.relays.itervalues(): return if relay.authenticated: del self.relays[relay.ip] if self.shutting_down: if len(self.relays) == 0: self.defer.callback(None) else: self.cleanup_timers[relay.ip] = reactor.callLater(DispatcherConfig.cleanup_dead_relays_after, self._do_cleanup, relay.ip) def _do_cleanup(self, ip): - log.debug('Doing cleanup for old relay %s' % ip) + log.debug('Cleaning up after old relay at %s' % ip) del self.cleanup_timers[ip] - for call_id in [call_id for call_id, session in self.sessions.items() if session.relay_ip == ip]: + for call_id in (call_id for call_id, session in self.sessions.items() if session.relay_ip == ip): del self.sessions[call_id] def shutdown(self): if self.shutting_down: return self.shutting_down = True for timer in self.cleanup_timers.itervalues(): timer.cancel() if len(self.relays) == 0: retval = succeed(None) else: for prot in self.relays.itervalues(): prot.transport.loseConnection() self.defer = Deferred() retval = self.defer retval.addCallback(self._save_state) return retval def _save_state(self, result): pickle.dump(self.sessions, open(process.runtime.file('dispatcher_state'), 'w')) class Dispatcher(object): - def __init__(self): self.accounting = [__import__('mediaproxy.interfaces.accounting.%s' % mod.lower(), globals(), locals(), ['']).Accounting() for mod in set(DispatcherConfig.accounting)] self.cred = X509Credentials(cert_name='dispatcher') self.tls_context = TLSContext(self.cred) self.relay_factory = RelayFactory(self) dispatcher_addr, dispatcher_port = DispatcherConfig.listen self.relay_listener = reactor.listenTLS(dispatcher_port, self.relay_factory, self.tls_context, interface=dispatcher_addr) self.opensips_factory = OpenSIPSControlFactory(self) socket_path = process.runtime.file(DispatcherConfig.socket_path) unlink(socket_path) self.opensips_listener = reactor.listenUNIX(socket_path, self.opensips_factory) self.opensips_management = opensips.ManagementInterface() self.management_factory = ManagementControlFactory(self) management_addr, management_port = DispatcherConfig.listen_management if DispatcherConfig.management_use_tls: self.management_listener = reactor.listenTLS(management_port, self.management_factory, self.tls_context, interface=management_addr) else: self.management_listener = reactor.listenTCP(management_port, self.management_factory, interface=management_addr) def run(self): log.debug('Using {0.__class__.__name__}'.format(reactor)) process.signals.add_handler(signal.SIGHUP, self._handle_SIGHUP) process.signals.add_handler(signal.SIGINT, self._handle_SIGINT) process.signals.add_handler(signal.SIGTERM, self._handle_SIGTERM) for accounting_module in self.accounting: accounting_module.start() reactor.run(installSignalHandlers=False) def send_command(self, command): return maybeDeferred(self.relay_factory.send_command, command) - def update_statistics(self, stats): - log.debug('Got statistics: %s' % stats) + def update_statistics(self, session, stats): + session.logger.info('statistics: {}'.format(stats)) if stats['start_time'] is not None: for accounting in self.accounting: try: accounting.do_accounting(stats) except Exception, e: log.exception('An unhandled error occurred while doing accounting: %s' % e) def _handle_SIGHUP(self, *args): log.info('Received SIGHUP, shutting down.') reactor.callFromThread(self._shutdown) def _handle_SIGINT(self, *args): if process.daemon: log.info('Received SIGINT, shutting down.') else: log.info('Received KeyboardInterrupt, exiting.') reactor.callFromThread(self._shutdown) def _handle_SIGTERM(self, *args): log.info('Received SIGTERM, shutting down.') reactor.callFromThread(self._shutdown) def _shutdown(self): defer = DeferredList([result for result in [self.opensips_listener.stopListening(), self.management_listener.stopListening(), self.relay_listener.stopListening()] if result is not None]) defer.addCallback(lambda x: self.opensips_factory.shutdown()) defer.addCallback(lambda x: self.management_factory.shutdown()) defer.addCallback(lambda x: self.relay_factory.shutdown()) defer.addCallback(lambda x: self._stop()) def _stop(self): for act in self.accounting: act.stop() reactor.stop() diff --git a/mediaproxy/mediacontrol.py b/mediaproxy/mediacontrol.py index 8d4e3cd..9a2675a 100644 --- a/mediaproxy/mediacontrol.py +++ b/mediaproxy/mediacontrol.py @@ -1,794 +1,826 @@ +import hashlib import struct -from time import time -from collections import deque -from operator import attrgetter -from itertools import chain from application import log +from base64 import b64encode as base64_encode +from itertools import chain +from collections import deque +from operator import attrgetter +from time import time from twisted.internet import reactor from twisted.internet.interfaces import IReadDescriptor from twisted.internet.protocol import DatagramProtocol from twisted.internet.error import CannotListenError from twisted.python.log import Logger from zope.interface import implements from mediaproxy.configuration import RelayConfig from mediaproxy.interfaces.system import _conntrack from mediaproxy.iputils import is_routable_ip from mediaproxy.scheduler import RecurrentCall, KeepRunning - -UDP_TIMEOUT_FILE = "/proc/sys/net/ipv4/netfilter/ip_conntrack_udp_timeout_stream" +UDP_TIMEOUT_FILE = '/proc/sys/net/ipv4/netfilter/ip_conntrack_udp_timeout_stream' rtp_payloads = { - 0: "G711u", 1: "1016", 2: "G721", 3: "GSM", 4: "G723", 5: "DVI4", 6: "DVI4", - 7: "LPC", 8: "G711a", 9: "G722", 10: "L16", 11: "L16", 14: "MPA", 15: "G728", - 18: "G729", 25: "CelB", 26: "JPEG", 28: "nv", 31: "H261", 32: "MPV", 33: "MP2T", - 34: "H263" + 0: 'G711u', 1: '1016', 2: 'G721', 3: 'GSM', 4: 'G723', 5: 'DVI4', 6: 'DVI4', + 7: 'LPC', 8: 'G711a', 9: 'G722', 10: 'L16', 11: 'L16', 14: 'MPA', 15: 'G728', + 18: 'G729', 25: 'CelB', 26: 'JPEG', 28: 'nv', 31: 'H261', 32: 'MPV', 33: 'MP2T', + 34: 'H263' } + class RelayPortsExhaustedError(Exception): pass if RelayConfig.relay_ip is None: - raise RuntimeError("Could not determine default host IP; either add default route or specify relay IP manually") + raise RuntimeError('Could not determine default host IP; either add default route or specify relay IP manually') + + +class SessionLogger(log.ContextualLogger): + def __init__(self, session): + super(SessionLogger, self).__init__(logger=log.get_logger()) # use the main logger as backend + self.session_id = session.session_id + + def apply_context(self, message): + return '[session {0.session_id}] {1}'.format(self, message) if message != '' else '' class Address(object): """Representation of an endpoint address""" + def __init__(self, host, port, in_use=True, got_rtp=False): self.host = host self.port = port self.in_use = self.__nonzero__() and in_use self.got_rtp = got_rtp + def __len__(self): return 2 + def __nonzero__(self): return None not in (self.host, self.port) + def __getitem__(self, index): return (self.host, self.port)[index] + def __contains__(self, item): return item in (self.host, self.port) + def __iter__(self): yield self.host yield self.port + def __str__(self): - return self.__nonzero__() and ("%s:%d" % (self.host, self.port)) or "Unknown" + return self.__nonzero__() and ('%s:%d' % (self.host, self.port)) or 'Unknown' + def __repr__(self): - return "%s(%r, %r, in_use=%r, got_rtp=%r)" % (self.__class__.__name__, self.host, self.port, self.in_use, self.got_rtp) + return '%s(%r, %r, in_use=%r, got_rtp=%r)' % (self.__class__.__name__, self.host, self.port, self.in_use, self.got_rtp) + def forget(self): self.host, self.port, self.in_use, self.got_rtp = None, None, False, False + @property def unknown(self): return None in (self.host, self.port) + @property def obsolete(self): return self.__nonzero__() and not self.in_use class Counters(dict): def __add__(self, other): n = Counters(self) for k, v in other.iteritems(): n[k] += v return n + def __iadd__(self, other): for k, v in other.iteritems(): self[k] += v return self + @property def caller_bytes(self): return self['caller_bytes'] + @property def callee_bytes(self): return self['callee_bytes'] + @property def caller_packets(self): return self['caller_packets'] + @property def callee_packets(self): return self['callee_packets'] + @property def relayed_bytes(self): return self['caller_bytes'] + self['callee_bytes'] + @property def relayed_packets(self): return self['caller_packets'] + self['callee_packets'] class StreamListenerProtocol(DatagramProtocol): noisy = False def __init__(self): self.cb_func = None self.sdp = None self.send_packet_count = 0 self.stun_queue = [] def datagramReceived(self, data, (host, port)): if self.cb_func is not None: self.cb_func(host, port, data) def set_remote_sdp(self, ip, port): if is_routable_ip(ip): self.sdp = ip, port else: self.sdp = None def send(self, data, is_stun, ip=None, port=None): if is_stun: self.stun_queue.append(data) if ip is None or port is None: # this means that we have not received any packets from this host yet, # so we have not learnt its address if self.sdp is None: # we can't do anything if we haven't received the SDP IP yet or # it was in a private range return ip, port = self.sdp # we learnt the IP, empty the STUN packets queue if self.stun_queue: for data in self.stun_queue: self.transport.write(data, (ip, port)) self.stun_queue = [] if not is_stun: if not self.send_packet_count % RelayConfig.userspace_transmit_every: self.transport.write(data, (ip, port)) self.send_packet_count += 1 def _stun_test(data): # Check if data is a STUN request and if it's a binding request if len(data) < 20: return False, False - msg_type, msg_len, magic = struct.unpack("!HHI", data[:8]) + msg_type, msg_len, magic = struct.unpack('!HHI', data[:8]) if msg_type & 0xc == 0 and magic == 0x2112A442: if msg_type == 0x0001: return True, True else: return True, False else: return False, False -class MediaSubParty(object): +class MediaSubParty(object): def __init__(self, substream, listener): self.substream = substream + self.logger = substream.logger self.listener = listener self.listener.protocol.cb_func = self.got_data self.remote = Address(None, None) host = self.listener.protocol.transport.getHost() self.local = Address(host.host, host.port) self.timer = None - self.codec = "Unknown" + self.codec = 'Unknown' self.got_stun_probing = False self.reset() def reset(self): if self.timer and self.timer.active(): self.timer.cancel() - self.timer = reactor.callLater(RelayConfig.stream_timeout, self.substream.expired, "no-traffic timeout", RelayConfig.stream_timeout) - self.remote.in_use = False # keep remote address around but mark it as obsolete + self.timer = reactor.callLater(RelayConfig.stream_timeout, self.substream.expired, 'no-traffic timeout', RelayConfig.stream_timeout) + self.remote.in_use = False # keep remote address around but mark it as obsolete self.remote.got_rtp = False self.got_stun_probing = False self.listener.protocol.send_packet_count = 0 def before_hold(self): if self.timer and self.timer.active(): self.timer.cancel() - self.timer = reactor.callLater(RelayConfig.on_hold_timeout, self.substream.expired, "on hold timeout", RelayConfig.on_hold_timeout) + self.timer = reactor.callLater(RelayConfig.on_hold_timeout, self.substream.expired, 'on hold timeout', RelayConfig.on_hold_timeout) def after_hold(self): if self.timer and self.timer.active(): self.timer.cancel() if not self.remote.in_use: - self.timer = reactor.callLater(RelayConfig.stream_timeout, self.substream.expired, "no-traffic timeout", RelayConfig.stream_timeout) + self.timer = reactor.callLater(RelayConfig.stream_timeout, self.substream.expired, 'no-traffic timeout', RelayConfig.stream_timeout) def got_data(self, host, port, data): if (host, port) == tuple(self.remote): if self.remote.obsolete: # the received packet matches the previously used IP/port, # which has been made obsolete, so ignore it return else: if self.remote.in_use: # the received packet is different than the recorded IP/port, # so we will discard it return # we have learnt the remote IP/port self.remote.host, self.remote.port = host, port self.remote.in_use = True - log.debug("Got traffic information for stream: %s" % self.substream.stream) + self.logger.info('discovered peer: %s' % self.substream.stream) is_stun, is_binding_request = _stun_test(data) self.substream.send_data(self, data, is_stun) if not self.remote.got_rtp and not is_stun: # This is the first RTP packet received self.remote.got_rtp = True if self.timer: if self.timer.active(): self.timer.cancel() self.timer = None - if self.codec == "Unknown" and self.substream is self.substream.stream.rtp: + if self.codec == 'Unknown' and self.substream is self.substream.stream.rtp: try: pt = ord(data[1]) & 127 except IndexError: pass else: if pt > 95: - self.codec = "Dynamic(%d)" % pt + self.codec = 'Dynamic(%d)' % pt elif pt in rtp_payloads: self.codec = rtp_payloads[pt] else: - self.codec = "Unknown(%d)" % pt + self.codec = 'Unknown(%d)' % pt self.substream.check_create_conntrack() if is_binding_request: self.got_stun_probing = True def cleanup(self): if self.timer and self.timer.active(): self.timer.cancel() self.timer = None self.listener.protocol.cb_func = None self.substream = None class MediaSubStream(object): - def __init__(self, stream, listener_caller, listener_callee): self.stream = stream + self.logger = stream.logger self.forwarding_rule = None self.caller = MediaSubParty(self, listener_caller) self.callee = MediaSubParty(self, listener_callee) self._counters = Counters(caller_bytes=0, callee_bytes=0, caller_packets=0, callee_packets=0) @property def counters(self): """Accumulated counters from all the forwarding rules the stream had""" if self.forwarding_rule is None: return self._counters else: try: return self._counters + self.forwarding_rule.counters except _conntrack.Error: return self._counters def _stop_relaying(self): if self.forwarding_rule is not None: try: self._counters += self.forwarding_rule.counters except _conntrack.Error: pass self.forwarding_rule = None def reset(self, party): - if party == "caller": + if party == 'caller': self.caller.reset() else: self.callee.reset() self._stop_relaying() def check_create_conntrack(self): if self.stream.first_media_time is None: self.stream.first_media_time = time() if self.caller.remote.in_use and self.caller.remote.got_rtp and self.callee.remote.in_use and self.callee.remote.got_rtp: self.forwarding_rule = _conntrack.ForwardingRule(self.caller.remote, self.caller.local, self.callee.remote, self.callee.local, self.stream.session.mark) self.forwarding_rule.expired_func = self.conntrack_expired def send_data(self, source, data, is_stun): if source is self.caller: dest = self.callee else: dest = self.caller if dest.remote: # if we have already learnt the remote address of the destination, use that ip, port = dest.remote.host, dest.remote.port dest.listener.protocol.send(data, is_stun, ip, port) else: # otherwise use the IP/port specified in the SDP, if public dest.listener.protocol.send(data, is_stun) def conntrack_expired(self): try: timeout_wait = int(open(UDP_TIMEOUT_FILE).read()) except: timeout_wait = 0 - self.expired("conntrack timeout", timeout_wait) + self.expired('conntrack timeout', timeout_wait) def expired(self, reason, timeout_wait): self._stop_relaying() self.stream.substream_expired(self, reason, timeout_wait) def cleanup(self): self.caller.cleanup() self.callee.cleanup() self._stop_relaying() self.stream = None class MediaParty(object): - def __init__(self, stream): self.manager = stream.session.manager + self.logger = stream.logger self._remote_sdp = None self.is_on_hold = False self.uses_ice = False while True: self.listener_rtp = None self.ports = port_rtp, port_rtcp = self.manager.get_ports() try: self.listener_rtp = reactor.listenUDP(port_rtp, StreamListenerProtocol(), interface=RelayConfig.relay_ip) self.listener_rtcp = reactor.listenUDP(port_rtcp, StreamListenerProtocol(), interface=RelayConfig.relay_ip) except CannotListenError: if self.listener_rtp is not None: self.listener_rtp.stopListening() self.manager.set_bad_ports(self.ports) - log.warning('Cannot use port pair %d/%d' % self.ports) + self.logger.warning('Cannot use port pair %d/%d' % self.ports) else: break def _get_remote_sdp(self): return self._remote_sdp def _set_remote_sdp(self, (ip, port)): self._remote_sdp = ip, port self.listener_rtp.protocol.set_remote_sdp(ip, port) + remote_sdp = property(_get_remote_sdp, _set_remote_sdp) def cleanup(self): self.listener_rtp.stopListening() self.listener_rtcp.stopListening() self.manager.free_ports(self.ports) self.manager = None -class MediaStream(object): +class MediaStream(object): def __init__(self, session, media_type, media_ip, media_port, direction, media_parameters, initiating_party): self.is_alive = True - self.session = session + self.session = session # type: Session + self.logger = session.logger self.media_type = media_type self.caller = MediaParty(self) self.callee = MediaParty(self) self.rtp = MediaSubStream(self, self.caller.listener_rtp, self.callee.listener_rtp) self.rtcp = MediaSubStream(self, self.caller.listener_rtcp, self.callee.listener_rtcp) getattr(self, initiating_party).remote_sdp = (media_ip, media_port) - getattr(self, initiating_party).uses_ice = (media_parameters.get("ice", "no") == "yes") + getattr(self, initiating_party).uses_ice = (media_parameters.get('ice', 'no') == 'yes') self.check_hold(initiating_party, direction, media_ip) self.create_time = time() self.first_media_time = None self.start_time = None self.end_time = None - self.status = "active" + self.status = 'active' self.timeout_wait = 0 def __str__(self): if self.caller.remote_sdp is None: - src = "Unknown" + src = 'Unknown' else: - src = "%s:%d" % self.caller.remote_sdp + src = '%s:%d' % self.caller.remote_sdp if self.caller.is_on_hold: - src += " ON HOLD" + src += ' ON HOLD' if self.caller.uses_ice: - src += " (ICE)" + src += ' (ICE)' if self.callee.remote_sdp is None: - dst = "Unknown" + dst = 'Unknown' else: - dst = "%s:%d" % self.callee.remote_sdp + dst = '%s:%d' % self.callee.remote_sdp if self.callee.is_on_hold: - dst += " ON HOLD" + dst += ' ON HOLD' if self.callee.uses_ice: - dst += " (ICE)" + dst += ' (ICE)' rtp = self.rtp rtcp = self.rtcp - return "(%s) %s (RTP: %s, RTCP: %s) <-> %s <-> %s <-> %s (RTP: %s, RTCP: %s)" % ( + return '(%s) %s (RTP: %s, RTCP: %s) <-> %s <-> %s <-> %s (RTP: %s, RTCP: %s)' % ( self.media_type, src, rtp.caller.remote, rtcp.caller.remote, rtp.caller.local, rtp.callee.local, dst, rtp.callee.remote, rtcp.callee.remote) @property def counters(self): return self.rtp.counters + self.rtcp.counters @property def is_on_hold(self): return self.caller.is_on_hold or self.callee.is_on_hold def check_hold(self, party, direction, ip): previous_hold = self.is_on_hold party = getattr(self, party) - if direction == "sendonly" or direction == "inactive": + if direction == 'sendonly' or direction == 'inactive': party.is_on_hold = True - elif ip == "0.0.0.0": + elif ip == '0.0.0.0': party.is_on_hold = True else: party.is_on_hold = False if previous_hold and not self.is_on_hold: for substream in [self.rtp, self.rtcp]: for subparty in [substream.caller, substream.callee]: - self.status = "active" + self.status = 'active' subparty.after_hold() if not previous_hold and self.is_on_hold: for substream in [self.rtp, self.rtcp]: for subparty in [substream.caller, substream.callee]: - self.status = "on hold" + self.status = 'on hold' subparty.before_hold() def reset(self, party, media_ip, media_port): self.rtp.reset(party) self.rtcp.reset(party) getattr(self, party).remote_sdp = (media_ip, media_port) def substream_expired(self, substream, reason, timeout_wait): if substream is self.rtp and self.caller.uses_ice and self.callee.uses_ice: - log.debug("RTP stream expired for session %s: %s" % (self.session, reason)) - reason = "unselected ICE candidate" + reason = 'unselected ICE candidate' + self.logger.info('RTP stream expired: {}'.format(reason)) if not substream.caller.got_stun_probing and not substream.callee.got_stun_probing: - log.debug("unselected ICE candidate for session %s but no STUN was received" % self.session) - if substream is self.rtcp or (self.is_on_hold and reason=='conntrack timeout'): + self.logger.info('unselected ICE candidate, but no STUN was received') + if substream is self.rtcp or (self.is_on_hold and reason == 'conntrack timeout'): # Forget about the remote addresses, this will cause any # re-occurrence of the same traffic to be forwarded again substream.caller.remote.forget() substream.caller.listener.protocol.send_packet_count = 0 substream.callee.remote.forget() substream.callee.listener.protocol.send_packet_count = 0 else: session = self.session self.cleanup(reason) self.timeout_wait = timeout_wait session.stream_expired(self) - def cleanup(self, status="closed"): + def cleanup(self, status='closed'): if self.is_alive: self.is_alive = False self.status = status self.caller.cleanup() self.callee.cleanup() self.rtp.cleanup() self.rtcp.cleanup() self.session = None self.end_time = time() class Session(object): - - def __init__(self, manager, dispatcher, call_id, from_tag, from_uri, to_tag, to_uri, cseq, user_agent, media_list, is_downstream, is_caller_cseq, mark = 0): + def __init__(self, manager, dispatcher, call_id, from_tag, from_uri, to_tag, to_uri, cseq, user_agent, media_list, is_downstream, is_caller_cseq, mark=0): self.manager = manager self.dispatcher = dispatcher + self.session_id = base64_encode(hashlib.md5(call_id).digest()).rstrip('=') self.call_id = call_id self.from_tag = from_tag self.to_tag = None self.mark = mark self.from_uri = from_uri self.to_uri = to_uri self.caller_ua = None self.callee_ua = None self.cseq = None self.previous_cseq = None self.streams = {} self.start_time = None self.end_time = None + self.logger = SessionLogger(self) + self.logger.info('created (call-id: {0.call_id} from-tag: {0.from_tag})'.format(self)) self.update_media(cseq, to_tag, user_agent, media_list, is_downstream, is_caller_cseq) - def __str__(self): - return "%s: %s (%s) --> %s" % (self.call_id, self.from_uri, self.from_tag, self.to_uri) - def update_media(self, cseq, to_tag, user_agent, media_list, is_downstream, is_caller_cseq): if self.cseq is None: - old_cseq = (0,0) + old_cseq = (0, 0) else: old_cseq = self.cseq if is_caller_cseq: cseq = (cseq, old_cseq[1]) if self.to_tag is None and to_tag is not None: self.to_tag = to_tag else: cseq = (old_cseq[0], cseq) if is_downstream: - party = "caller" + party = 'caller' if self.caller_ua is None: self.caller_ua = user_agent else: - party = "callee" + party = 'callee' if self.callee_ua is None: self.callee_ua = user_agent if self.cseq is None or cseq > self.cseq: if not media_list: return - log.debug("Received new SDP offer") + self.logger.info('got SDP offer') self.streams[cseq] = new_streams = [] if self.cseq is None: old_streams = [] else: old_streams = self.streams[self.cseq] for media_type, media_ip, media_port, media_direction, media_parameters in media_list: - stream = None for old_stream in old_streams: old_remote = getattr(old_stream, party).remote_sdp if old_remote is not None: old_ip, old_port = old_remote else: old_ip, old_port = None, None if old_stream.is_alive and old_stream.media_type == media_type and ((media_ip, media_port) in ((old_ip, old_port), ('0.0.0.0', old_port), (old_ip, 0))): stream = old_stream stream.check_hold(party, media_direction, media_ip) - log.debug("Found matching existing stream: %s" % stream) + if media_port == 0: + self.logger.info('disabled stream: %s', stream) + else: + self.logger.info('retained stream: %s', stream) break - if stream is None: + else: stream = MediaStream(self, media_type, media_ip, media_port, media_direction, media_parameters, party) - log.debug("Added new stream: %s" % stream) + self.logger.info('proposed stream: %s' % stream) if media_port == 0: stream.cleanup() - log.debug("Stream explicitly closed: %s" % stream) new_streams.append(stream) if self.previous_cseq is not None: for stream in self.streams[self.previous_cseq]: if stream not in self.streams[self.cseq] + new_streams: stream.cleanup() self.previous_cseq = self.cseq self.cseq = cseq elif self.cseq == cseq: - log.debug("Received updated SDP answer") + self.logger.info('got SDP answer') now = time() if self.start_time is None: self.start_time = now current_streams = self.streams[cseq] for stream in current_streams: if stream.start_time is None: stream.start_time = now if to_tag is not None and not media_list: return if len(media_list) < len(current_streams): for stream in current_streams[len(media_list):]: - log.debug("Stream rejected by not being included in the SDP answer: %s" % stream) - stream.cleanup("rejected") + self.logger.info('removed! stream: %s' % stream) + stream.cleanup('rejected') for stream, (media_type, media_ip, media_port, media_direction, media_parameters) in zip(current_streams, media_list): if stream.media_type != media_type: - raise ValueError('Media types do not match: "%s" and "%s"' % (stream.media_type, media_type)) + raise ValueError('Media types do not match: %r and %r' % (stream.media_type, media_type)) if media_port == 0: - log.debug("Stream explicitly rejected: %s" % stream) - stream.cleanup("rejected") + if stream.is_alive: + self.logger.info('rejected stream: %s' % stream) + else: + self.logger.info('disabled stream: %s' % stream) + stream.cleanup('rejected') continue stream.check_hold(party, media_direction, media_ip) party_info = getattr(stream, party) - party_info.uses_ice = (media_parameters.get("ice", "no") == "yes") - if party_info.remote_sdp is None or party_info.remote_sdp[0] == "0.0.0.0": + party_info.uses_ice = (media_parameters.get('ice', 'no') == 'yes') + if party_info.remote_sdp is None or party_info.remote_sdp[0] == '0.0.0.0': party_info.remote_sdp = (media_ip, media_port) - log.debug("Got initial answer from %s for stream: %s" % (party, stream)) + self.logger.info('accepted stream: %s' % stream) else: if party_info.remote_sdp[1] != media_port or (party_info.remote_sdp[0] != media_ip != '0.0.0.0'): stream.reset(party, media_ip, media_port) - log.debug("Updated %s for stream: %s" % (party, stream)) + self.logger.info('updating stream: %s' % stream) else: - log.debug("Unchanged stream: %s" % stream) + self.logger.info('retained stream: %s' % stream) if self.previous_cseq is not None: for stream in [stream for stream in self.streams[self.previous_cseq] if stream not in current_streams]: - log.debug("Removing old stream: %s" % stream) + self.logger.info('removing stream: %s' % stream) stream.cleanup() else: - log.debug("Received old CSeq %d:%d, ignoring" % cseq) + self.logger.info('got old CSeq %d:%d, ignoring' % cseq) def get_local_media(self, is_downstream, cseq, is_caller_cseq): if is_caller_cseq: pos = 0 else: pos = 1 try: cseq = max(key for key in self.streams.keys() if key[pos] == cseq) except ValueError: return None if is_downstream: - retval = [(stream.status in ["active", "on hold"]) and tuple(stream.rtp.callee.local) or (stream.rtp.callee.local.host, 0) for stream in self.streams[cseq]] + retval = [(stream.status in ['active', 'on hold']) and tuple(stream.rtp.callee.local) or (stream.rtp.callee.local.host, 0) for stream in self.streams[cseq]] else: - retval = [(stream.status in ["active", "on hold"]) and tuple(stream.rtp.caller.local) or (stream.rtp.caller.local.host, 0) for stream in self.streams[cseq]] + retval = [(stream.status in ['active', 'on hold']) and tuple(stream.rtp.caller.local) or (stream.rtp.caller.local.host, 0) for stream in self.streams[cseq]] return retval def cleanup(self): self.end_time = time() for cseq in [self.previous_cseq, self.cseq]: if cseq is not None: for stream in self.streams[cseq]: stream.cleanup() def stream_expired(self, stream): active_streams = set() for cseq in [self.previous_cseq, self.cseq]: if cseq is not None: active_streams.update({stream for stream in self.streams[cseq] if stream.is_alive}) if len(active_streams) == 0: self.manager.session_expired(self.call_id, self.from_tag) @property def duration(self): if self.start_time is not None: if self.end_time is not None: return int(self.end_time - self.start_time) else: return int(time() - self.start_time) else: return 0 @property def relayed_bytes(self): return sum(stream.counters.relayed_bytes for stream in set(chain(*self.streams.itervalues()))) @property def statistics(self): all_streams = set(chain(*self.streams.itervalues())) attributes = ('call_id', 'from_tag', 'from_uri', 'to_tag', 'to_uri', 'start_time', 'duration') stats = dict((name, getattr(self, name)) for name in attributes) stats['caller_ua'] = self.caller_ua or 'Unknown' stats['callee_ua'] = self.callee_ua or 'Unknown' stats['streams'] = streams = [] stream_attributes = ('media_type', 'status', 'timeout_wait') - for stream in sorted(all_streams, key=attrgetter('start_time')): + for stream in sorted(all_streams, key=attrgetter('start_time')): # type: MediaStream info = dict((name, getattr(stream, name)) for name in stream_attributes) info['caller_codec'] = stream.rtp.caller.codec info['callee_codec'] = stream.rtp.callee.codec if stream.start_time is None: info['start_time'] = info['end_time'] = None elif self.start_time is None: info['start_time'] = info['end_time'] = 0 else: info['start_time'] = max(int(stream.start_time - self.start_time), 0) if stream.status == 'rejected': info['end_time'] = info['start_time'] else: if stream.end_time is None: info['end_time'] = stats['duration'] else: info['end_time'] = min(int(stream.end_time - self.start_time), self.duration) if stream.first_media_time is None: info['post_dial_delay'] = None else: info['post_dial_delay'] = stream.first_media_time - stream.create_time caller = stream.rtp.caller callee = stream.rtp.callee info.update(stream.counters) info['caller_local'] = str(caller.local) info['callee_local'] = str(callee.local) info['caller_remote'] = str(caller.remote) info['callee_remote'] = str(callee.remote) streams.append(info) return stats class SessionManager(Logger): implements(IReadDescriptor) def __init__(self, relay, start_port, end_port): self.relay = relay - self.ports = deque((i, i+1) for i in xrange(start_port, end_port, 2)) + self.ports = deque((i, i + 1) for i in xrange(start_port, end_port, 2)) self.bad_ports = deque() self.sessions = {} self.watcher = _conntrack.ExpireWatcher() - self.active_byte_counter = 0 # relayed byte counter for sessions active during last speed measurement - self.closed_byte_counter = 0 # relayed byte counter for sessions closed after last speed measurement + self.active_byte_counter = 0 # relayed byte counter for sessions active during last speed measurement + self.closed_byte_counter = 0 # relayed byte counter for sessions closed after last speed measurement self.bps_relayed = 0 if RelayConfig.traffic_sampling_period > 0: self.speed_calculator = RecurrentCall(RelayConfig.traffic_sampling_period, self._measure_speed) else: self.speed_calculator = None reactor.addReader(self) def _measure_speed(self): start_time = time() current_byte_counter = sum(session.relayed_bytes for session in self.sessions.itervalues()) self.bps_relayed = 8 * (current_byte_counter + self.closed_byte_counter - self.active_byte_counter) / RelayConfig.traffic_sampling_period self.active_byte_counter = current_byte_counter self.closed_byte_counter = 0 us_taken = int((time() - start_time) * 1000000) if us_taken > 10000: log.warning('Aggregate speed calculation time exceeded 10ms: %d us for %d sessions' % (us_taken, len(self.sessions))) return KeepRunning # implemented for IReadDescriptor def fileno(self): return self.watcher.fd def doRead(self): stream = self.watcher.read() if stream: stream.expired_func() def connectionLost(self, reason): reactor.removeReader(self) # port management def get_ports(self): if len(self.bad_ports) > len(self.ports): log.debug('Excessive amount of bad ports, doing cleanup') self.ports.extend(self.bad_ports) self.bad_ports = deque() try: return self.ports.popleft() except IndexError: raise RelayPortsExhaustedError() def set_bad_ports(self, ports): self.bad_ports.append(ports) def free_ports(self, ports): self.ports.append(ports) # called by higher level def _find_session_key(self, call_id, from_tag, to_tag): key_from = (call_id, from_tag) if key_from in self.sessions: return key_from if to_tag: key_to = (call_id, to_tag) if key_to in self.sessions: return key_to return None def has_session(self, call_id, from_tag, to_tag=None, **kw): return any((call_id, tag) in self.sessions for tag in (from_tag, to_tag) if tag is not None) def update_session(self, dispatcher, call_id, from_tag, from_uri, to_uri, cseq, user_agent, type, media=[], to_tag=None, **kw): key = self._find_session_key(call_id, from_tag, to_tag) if key: session = self.sessions[key] - log.debug("updating existing session %s" % session) - is_downstream = (session.from_tag != from_tag) ^ (type == "request") + is_downstream = (session.from_tag != from_tag) ^ (type == 'request') is_caller_cseq = (session.from_tag == from_tag) session.update_media(cseq, to_tag, user_agent, media, is_downstream, is_caller_cseq) - elif type == "reply" and not media: + elif type == 'reply' and not media: return None else: - is_downstream = type == "request" + is_downstream = type == 'request' is_caller_cseq = True session = Session(self, dispatcher, call_id, from_tag, from_uri, to_tag, to_uri, cseq, user_agent, media, is_downstream, is_caller_cseq) self.sessions[(call_id, from_tag)] = session self.relay.add_session(dispatcher) - log.debug("created new session %s" % session) return session.get_local_media(is_downstream, cseq, is_caller_cseq) def remove_session(self, call_id, from_tag, to_tag=None, **kw): key = self._find_session_key(call_id, from_tag, to_tag) try: session = self.sessions[key] except KeyError: log.warning('The dispatcher tried to remove a session which is no longer present on the relay') return None - log.debug('removing session %s' % session) + session.logger.info('removed') session.cleanup() self.closed_byte_counter += session.relayed_bytes del self.sessions[key] reactor.callLater(0, self.relay.remove_session, session.dispatcher) return session def session_expired(self, call_id, from_tag): key = (call_id, from_tag) try: session = self.sessions[key] except KeyError: - log.warning('A session expired that was no longer present on the relay') + log.warning('A session expired but is no longer present on the relay') return - log.debug('expired session %s' % session) + session.logger.info('expired') session.cleanup() self.closed_byte_counter += session.relayed_bytes del self.sessions[key] self.relay.session_expired(session) self.relay.remove_session(session.dispatcher) def cleanup(self): if self.speed_calculator is not None: self.speed_calculator.cancel() for key in self.sessions.keys(): self.session_expired(*key) @property def statistics(self): return [session.statistics for session in self.sessions.itervalues()] @property def stream_count(self): stream_count = {} for session in self.sessions.itervalues(): for stream in set(chain(*session.streams.itervalues())): if stream.is_alive: stream_count[stream.media_type] = stream_count.get(stream.media_type, 0) + 1 return stream_count - diff --git a/mediaproxy/relay.py b/mediaproxy/relay.py index cf82ea7..0302ade 100644 --- a/mediaproxy/relay.py +++ b/mediaproxy/relay.py @@ -1,392 +1,392 @@ """Implementation of the MediaProxy relay""" import cjson import signal import resource try: from twisted.internet import epollreactor; epollreactor.install() except: raise RuntimeError('mandatory epoll reactor support is not available from the twisted framework') from application import log from application.process import process from gnutls.errors import CertificateError, CertificateSecurityError from gnutls.interfaces.twisted import TLSContext from time import time from twisted.protocols.basic import LineOnlyReceiver from twisted.internet.error import ConnectionDone, TCPTimedOutError, DNSLookupError from twisted.internet.protocol import ClientFactory, connectionDone from twisted.internet.defer import DeferredList, succeed from twisted.internet import reactor from twisted.python import failure from twisted.names import dns from twisted.names.client import lookupService from twisted.names.error import DomainError from mediaproxy import __version__ from mediaproxy.configuration import RelayConfig from mediaproxy.headers import DecodingDict, DecodingError from mediaproxy.mediacontrol import SessionManager, RelayPortsExhaustedError from mediaproxy.scheduler import RecurrentCall, KeepRunning from mediaproxy.tls import X509Credentials # Increase the system limit for the maximum number of open file descriptors # to be able to handle connections to all ports in port_range fd_limit = RelayConfig.port_range.end - RelayConfig.port_range.start + 1000 try: resource.setrlimit(resource.RLIMIT_NOFILE, (fd_limit, fd_limit)) except ValueError: raise RuntimeError('Cannot set resource limit for maximum open file descriptors to %d' % fd_limit) else: new_limits = resource.getrlimit(resource.RLIMIT_NOFILE) if new_limits < (fd_limit, fd_limit): raise RuntimeError("Allocated resource limit for maximum open file descriptors is less then requested (%d instead of %d)" % (new_limits[0], fd_limit)) else: log.info('Set resource limit for maximum open file descriptors to %d' % fd_limit) class RelayClientProtocol(LineOnlyReceiver): noisy = False required_headers = {'update': {'call_id', 'from_tag', 'from_uri', 'to_uri', 'cseq', 'user_agent', 'type'}, 'remove': {'call_id', 'from_tag'}, 'summary': set(), 'sessions': set()} def __init__(self): self.command = None self.seq = None self.headers = DecodingDict() self._connection_watcher = None self._queued_keepalives = 0 def _send_keepalive(self): if self._queued_keepalives >= 3: log.error('missed 3 keepalive answers in a row. assuming the connection is down.') # do not use loseConnection() as it waits to flush the output buffers. reactor.callLater(0, self.transport.connectionLost, failure.Failure(TCPTimedOutError())) return None self.transport.write('ping' + self.delimiter) self._queued_keepalives += 1 return KeepRunning def reply(self, reply): self.transport.write(reply + self.delimiter) def connectionMade(self): peer = self.transport.getPeer() - log.debug('Connected to dispatcher at %s:%d' % (peer.host, peer.port)) + log.info('Connected to dispatcher at %s:%d' % (peer.host, peer.port)) if RelayConfig.passport is not None: peer_cert = self.transport.getPeerCertificate() if not RelayConfig.passport.accept(peer_cert): self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted')) self._connection_watcher = RecurrentCall(RelayConfig.keepalive_interval, self._send_keepalive) def connectionLost(self, reason=connectionDone): if self._connection_watcher is not None: self._connection_watcher.cancel() self._connection_watcher = None self._queued_keepalives = 0 def lineReceived(self, line): if line == 'pong': self._queued_keepalives -= 1 return if self.command is None: try: command, seq = line.split() except ValueError: log.error('Could not decode command/sequence number pair from dispatcher: %s' % line) return if command in self.required_headers: self.command = command self.seq = seq self.headers = DecodingDict() else: log.error('Unknown command: %s' % command) self.reply('{} error'.format(seq)) elif line == '': missing_headers = self.required_headers[self.command].difference(self.headers) if missing_headers: for header in missing_headers: log.error('Missing mandatory header %r from %r command' % (header, self.command)) response = 'error' else: # noinspection PyBroadException try: response = self.factory.parent.got_command(self.factory.host, self.command, self.headers) except Exception: log.exception() response = 'error' self.reply('{} {}'.format(self.seq, response)) self.command = None else: try: name, value = line.split(": ", 1) except ValueError: log.error('Unable to parse header: %s' % line) else: try: self.headers[name] = value except DecodingError, e: log.error('Could not decode header: %s' % e) class DispatcherConnectingFactory(ClientFactory): noisy = False protocol = RelayClientProtocol def __init__(self, parent, host, port): self.parent = parent self.host = (host, port) self.delayed = None self.connection_lost = False def __eq__(self, other): return self.host == other.host def clientConnectionFailed(self, connector, reason): log.error('Could not connect to dispatcher at %(host)s:%(port)d (retrying in %%d seconds): %%s' % connector.__dict__ % (RelayConfig.reconnect_delay, reason.value)) if self.parent.connector_needs_reconnect(connector): self.delayed = reactor.callLater(RelayConfig.reconnect_delay, connector.connect) def clientConnectionLost(self, connector, reason): self.cancel_delayed() if reason.type != ConnectionDone: log.error('Connection with dispatcher at %(host)s:%(port)d was lost: %%s' % connector.__dict__ % reason.value) else: log.info('Connection with dispatcher at %(host)s:%(port)d was closed' % connector.__dict__) if self.parent.connector_needs_reconnect(connector): if isinstance(reason.value, CertificateError) or self.connection_lost: self.delayed = reactor.callLater(RelayConfig.reconnect_delay, connector.connect) else: self.delayed = reactor.callLater(min(RelayConfig.reconnect_delay, 1), connector.connect) self.connection_lost = True def buildProtocol(self, addr): self.delayed = reactor.callLater(5, self._connected_successfully) return ClientFactory.buildProtocol(self, addr) def _connected_successfully(self): self.connection_lost = False def cancel_delayed(self): if self.delayed: if self.delayed.active(): self.delayed.cancel() self.delayed = None class SRVMediaRelayBase(object): def __init__(self): self.srv_monitor = RecurrentCall(RelayConfig.dns_check_interval, self._do_lookup) self._do_lookup() def _do_lookup(self): defers = [] for addr, port, is_domain in RelayConfig.dispatchers: if is_domain: defer = lookupService("_sip._udp.%s" % addr) defer.addCallback(self._cb_got_srv, port) defer.addErrback(self._eb_no_srv, addr, port) defers.append(defer) else: defers.append(succeed((addr, port))) defer = DeferredList(defers) defer.addCallback(self._cb_got_all) return KeepRunning def _cb_got_srv(self, (answers, auth, add), port): for answer in answers: if answer.type == dns.SRV and answer.payload and answer.payload.target != dns.Name("."): return str(answer.payload.target), port raise DomainError def _eb_no_srv(self, failure, addr, port): failure.trap(DomainError) return reactor.resolve(addr).addCallback(lambda host: (host, port)).addErrback(self._eb_no_dns, addr) def _eb_no_dns(self, failure, addr): failure.trap(DNSLookupError) log.error("Could resolve neither SRV nor A record for '%s'" % addr) def _cb_got_all(self, results): if not self.shutting_down: dispatchers = [result[1] for result in results if result[0] and result[1] is not None] self.update_dispatchers(dispatchers) def update_dispatchers(self, dispatchers): raise NotImplementedError() def run(self): process.signals.add_handler(signal.SIGHUP, self._handle_SIGHUP) process.signals.add_handler(signal.SIGINT, self._handle_SIGINT) process.signals.add_handler(signal.SIGTERM, self._handle_SIGTERM) reactor.run(installSignalHandlers=False) def _handle_SIGHUP(self, *args): log.info('Received SIGHUP, shutting down after all sessions have expired.') reactor.callFromThread(self.shutdown, graceful=True) def _handle_SIGINT(self, *args): if process.daemon: log.info('Received SIGINT, shutting down.') else: log.info('Received KeyboardInterrupt, exiting.') reactor.callFromThread(self.shutdown) def _handle_SIGTERM(self, *args): log.info('Received SIGTERM, shutting down.') reactor.callFromThread(self.shutdown) def shutdown(self, graceful=False): raise NotImplementedError() def on_shutdown(self): pass def _shutdown(self): reactor.stop() self.on_shutdown() try: from mediaproxy.sipthor import SIPThorMediaRelayBase as MediaRelayBase except ImportError: MediaRelayBase = SRVMediaRelayBase class MediaRelay(MediaRelayBase): def __init__(self): self.cred = X509Credentials(cert_name='relay') self.tls_context = TLSContext(self.cred) self.session_manager = SessionManager(self, RelayConfig.port_range.start, RelayConfig.port_range.end) self.dispatchers = set() self.dispatcher_session_count = {} self.dispatcher_connectors = {} self.old_connectors = {} self.shutting_down = False self.graceful_shutdown = False self.start_time = time() super(MediaRelay, self).__init__() @property def status(self): if self.graceful_shutdown or self.shutting_down: return 'halting' else: return 'active' def update_dispatchers(self, dispatchers): dispatchers = set(dispatchers) for new_dispatcher in dispatchers.difference(self.dispatchers): if new_dispatcher in self.old_connectors.iterkeys(): - log.debug('Restoring old dispatcher at %s:%d' % new_dispatcher) + log.info('Restoring old dispatcher at %s:%d' % new_dispatcher) self.dispatcher_connectors[new_dispatcher] = self.old_connectors.pop(new_dispatcher) else: - log.debug('Adding new dispatcher at %s:%d' % new_dispatcher) + log.info('Adding new dispatcher at %s:%d' % new_dispatcher) dispatcher_addr, dispatcher_port = new_dispatcher factory = DispatcherConnectingFactory(self, dispatcher_addr, dispatcher_port) self.dispatcher_connectors[new_dispatcher] = reactor.connectTLS(dispatcher_addr, dispatcher_port, factory, self.tls_context) for old_dispatcher in self.dispatchers.difference(dispatchers): - log.debug('Removing old dispatcher at %s:%d' % old_dispatcher) + log.info('Removing old dispatcher at %s:%d' % old_dispatcher) self.old_connectors[old_dispatcher] = self.dispatcher_connectors.pop(old_dispatcher) self._check_disconnect(old_dispatcher) self.dispatchers = dispatchers def got_command(self, dispatcher, command, headers): if command == 'summary': - summary = {'ip' : RelayConfig.relay_ip, - 'version' : __version__, - 'status' : self.status, - 'uptime' : int(time() - self.start_time), - 'session_count' : len(self.session_manager.sessions), - 'stream_count' : self.session_manager.stream_count, - 'bps_relayed' : self.session_manager.bps_relayed} + summary = {'ip': RelayConfig.relay_ip, + 'version': __version__, + 'status': self.status, + 'uptime': int(time() - self.start_time), + 'session_count': len(self.session_manager.sessions), + 'stream_count': self.session_manager.stream_count, + 'bps_relayed': self.session_manager.bps_relayed} return cjson.encode(summary) elif command == 'sessions': return cjson.encode(self.session_manager.statistics) elif command == 'update': if self.graceful_shutdown or self.shutting_down: if not self.session_manager.has_session(**headers): - log.debug('cannot add new session: media-relay is shutting down') + log.info('cannot add new session: media-relay is shutting down') return 'halting' try: local_media = self.session_manager.update_session(dispatcher, **headers) except RelayPortsExhaustedError: log.error('Could not reserve relay ports for session, all allocated ports are being used') return 'error' if local_media: return ' '.join([RelayConfig.advertised_ip or local_media[0][0]] + [str(media[1]) for media in local_media]) - else: # remove + else: # command == 'remove' session = self.session_manager.remove_session(**headers) if session is None: return 'error' else: return cjson.encode(session.statistics) def session_expired(self, session): connector = self.dispatcher_connectors.get(session.dispatcher) if connector is None: connector = self.old_connectors.get(session.dispatcher) if connector and connector.state == 'connected': connector.transport.write(' '.join(['expired', cjson.encode(session.statistics)]) + connector.factory.protocol.delimiter) else: log.warning('dispatcher for expired session is no longer online, statistics are lost!') def add_session(self, dispatcher): self.dispatcher_session_count[dispatcher] = self.dispatcher_session_count.get(dispatcher, 0) + 1 def remove_session(self, dispatcher): self.dispatcher_session_count[dispatcher] -= 1 if self.dispatcher_session_count[dispatcher] == 0: del self.dispatcher_session_count[dispatcher] if self.graceful_shutdown and not self.dispatcher_session_count: self.shutdown() elif dispatcher in self.old_connectors: self._check_disconnect(dispatcher) def _check_disconnect(self, dispatcher): connector = self.old_connectors[dispatcher] if self.dispatcher_session_count.get(dispatcher, 0) == 0: old_state = connector.state connector.factory.cancel_delayed() connector.disconnect() if old_state == "disconnected": del self.old_connectors[dispatcher] if self.shutting_down and len(self.dispatcher_connectors) + len(self.old_connectors) == 0: self._shutdown() def connector_needs_reconnect(self, connector): if connector in self.dispatcher_connectors.values(): return True else: for dispatcher, old_connector in self.old_connectors.items(): if old_connector is connector: if self.dispatcher_session_count.get(dispatcher, 0) > 0: return True else: del self.old_connectors[dispatcher] break if self.shutting_down: if len(self.old_connectors) == 0: self._shutdown() return False def shutdown(self, graceful=False): if graceful: self.graceful_shutdown = True if self.dispatcher_session_count: return if not self.shutting_down: self.shutting_down = True self.srv_monitor.cancel() self.session_manager.cleanup() if len(self.dispatcher_connectors) + len(self.old_connectors) == 0: self._shutdown() else: self.update_dispatchers([])