diff --git a/build_inplace b/build_inplace
index 084bab0..350c270 100755
--- a/build_inplace
+++ b/build_inplace
@@ -1,5 +1,5 @@
#!/bin/sh
-python setup.py build_ext --inplace "$@"
-test -d build && python setup.py clean
+python3 setup.py build_ext --inplace "$@"
+test -d build && python3 setup.py clean
diff --git a/media-dispatcher b/media-dispatcher
index 4269843..ed7eb58 100755
--- a/media-dispatcher
+++ b/media-dispatcher
@@ -1,77 +1,77 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
if __name__ == '__main__':
import mediaproxy
import sys
from application import log
from application.process import process, ProcessError
from argparse import ArgumentParser
name = 'media-dispatcher'
fullname = 'MediaProxy Dispatcher'
description = 'MediaProxy Dispatcher component'
process.configuration.user_directory = None
process.configuration.subdirectory = mediaproxy.mediaproxy_subdirectory
process.runtime.subdirectory = mediaproxy.mediaproxy_subdirectory
parser = ArgumentParser(usage='%(prog)s [options]')
parser.add_argument('--version', action='version', version='%(prog)s {}'.format(mediaproxy.__version__))
parser.add_argument('--systemd', action='store_true', help='run as a systemd simple service and log to journal')
parser.add_argument('--no-fork', action='store_false', dest='fork', help='run in the foreground and log to the terminal')
parser.add_argument('--config-dir', dest='config_directory', default=None, help='the configuration directory ({})'.format(process.configuration.system_directory), metavar='PATH')
parser.add_argument('--runtime-dir', dest='runtime_directory', default=None, help='the runtime directory ({})'.format(process.runtime.directory), metavar='PATH')
parser.add_argument('--debug', action='store_true', help='enable verbose logging')
parser.add_argument('--debug-memory', action='store_true', help='enable memory debugging')
options = parser.parse_args()
log.Formatter.prefix_format = '{record.levelname:<8s} '
if options.config_directory is not None:
process.configuration.local_directory = options.config_directory
if options.runtime_directory is not None:
process.runtime.directory = options.runtime_directory
try:
process.runtime.create_directory()
except ProcessError as e:
log.critical('Cannot start %s: %s' % (fullname, e))
sys.exit(1)
if options.systemd:
from systemd.journal import JournalHandler
log.set_handler(JournalHandler(SYSLOG_IDENTIFIER=name))
log.capture_output()
elif options.fork:
try:
process.daemonize(pidfile='{}.pid'.format(name))
except ProcessError as e:
log.critical('Cannot start %s: %s' % (fullname, e))
sys.exit(1)
log.use_syslog(name)
log.info('Starting %s %s' % (fullname, mediaproxy.__version__))
from mediaproxy.dispatcher import Dispatcher
from mediaproxy.configuration import DispatcherConfig
log.level.current = log.level.DEBUG if options.debug else DispatcherConfig.log_level
if options.debug_memory:
from application.debug.memory import memory_dump
try:
dispatcher = Dispatcher()
except Exception as e:
log.critical('Failed to create %s: %s' % (fullname, e))
if type(e) is not RuntimeError:
log.exception()
sys.exit(1)
dispatcher.run()
if options.debug_memory:
memory_dump()
diff --git a/media-relay b/media-relay
index 5c59023..40bf086 100755
--- a/media-relay
+++ b/media-relay
@@ -1,120 +1,120 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
if __name__ == '__main__':
import mediaproxy
import errno
import sys
import subprocess
from application import log
from application.process import process, ProcessError
from application.version import Version
from argparse import ArgumentParser
IP_FORWARD_FILE = '/proc/sys/net/ipv4/ip_forward'
CONNTRACK_ACCT_FILE = '/proc/sys/net/netfilter/nf_conntrack_acct'
KERNEL_VERSION_FILE = '/proc/sys/kernel/osrelease'
name = 'media-relay'
fullname = 'MediaProxy Relay'
description = 'MediaProxy Relay component'
process.configuration.user_directory = None
process.configuration.subdirectory = mediaproxy.mediaproxy_subdirectory
process.runtime.subdirectory = mediaproxy.mediaproxy_subdirectory
parser = ArgumentParser(usage='%(prog)s [options]')
parser.add_argument('--version', action='version', version='%(prog)s {}'.format(mediaproxy.__version__))
parser.add_argument('--systemd', action='store_true', help='run as a systemd simple service and log to journal')
parser.add_argument('--no-fork', action='store_false', dest='fork', help='run in the foreground and log to the terminal')
parser.add_argument('--config-dir', dest='config_directory', default=None, help='the configuration directory ({})'.format(process.configuration.system_directory), metavar='PATH')
parser.add_argument('--runtime-dir', dest='runtime_directory', default=None, help='the runtime directory ({})'.format(process.runtime.directory), metavar='PATH')
parser.add_argument('--debug', action='store_true', help='enable verbose logging')
parser.add_argument('--debug-memory', action='store_true', help='enable memory debugging')
options = parser.parse_args()
log.Formatter.prefix_format = '{record.levelname:<8s} '
if options.config_directory is not None:
process.configuration.local_directory = options.config_directory
if options.runtime_directory is not None:
process.runtime.directory = options.runtime_directory
if not sys.platform.startswith('linux'):
log.critical('Cannot start %s. A Linux host is required for operation.' % fullname)
sys.exit(1)
try:
subprocess.call(['modprobe', 'ip_tables'], env={'PATH': '/usr/sbin:/sbin:/usr/bin:/bin'})
except OSError as e:
log.critical('Cannot start %s: failed to load the ip_tables kernel module: %s' % (fullname, e))
sys.exit(1)
try:
kernel_version = Version.parse(open(KERNEL_VERSION_FILE).read().strip())
except (OSError, IOError, ValueError):
log.critical('Could not determine Linux kernel version')
sys.exit(1)
if kernel_version < Version(2, 6, 18):
log.critical('Linux kernel version 2.6.18 or newer is required to run the media relay')
sys.exit(1)
try:
ip_forward = bool(int(open(IP_FORWARD_FILE).read()))
except (OSError, IOError, ValueError):
ip_forward = False
if not ip_forward:
log.critical('IP forwarding is not available or not enabled (check %s)' % IP_FORWARD_FILE)
sys.exit(1)
try:
with open(CONNTRACK_ACCT_FILE, 'w') as acct_file:
acct_file.write('1')
except (IOError, OSError) as e:
if e.errno != errno.ENOENT:
log.critical('Could not enable conntrack rule counters (check %s): %s' % (CONNTRACK_ACCT_FILE, e))
sys.exit(1)
if options.systemd:
from systemd.journal import JournalHandler
log.set_handler(JournalHandler(SYSLOG_IDENTIFIER=name))
log.capture_output()
elif options.fork:
try:
process.daemonize(pidfile='{}.pid'.format(name))
except ProcessError as e:
log.critical('Cannot start %s: %s' % (fullname, e))
sys.exit(1)
log.use_syslog(name)
log.info('Starting %s %s' % (fullname, mediaproxy.__version__))
try:
process.wait_for_network(wait_time=10, wait_message='Waiting for network to become available...')
except KeyboardInterrupt:
sys.exit(0)
except RuntimeError as e:
log.critical('Cannot start %s: %s' % (fullname, e))
sys.exit(1)
try:
from mediaproxy.relay import MediaRelay
from mediaproxy.configuration import RelayConfig
log.level.current = log.level.DEBUG if options.debug else RelayConfig.log_level
if options.debug_memory:
from application.debug.memory import memory_dump
relay = MediaRelay()
except Exception as e:
log.critical('Failed to create %s: %s' % (fullname, e))
if type(e) is not RuntimeError:
log.exception()
sys.exit(1)
relay.run()
if options.debug_memory:
memory_dump()
diff --git a/mediaproxy/configuration/datatypes.py b/mediaproxy/configuration/datatypes.py
index f741394..e9748d3 100644
--- a/mediaproxy/configuration/datatypes.py
+++ b/mediaproxy/configuration/datatypes.py
@@ -1,113 +1,113 @@
import re
from application.configuration.datatypes import IPAddress, NetworkAddress, StringList
from gnutls import crypto
class DispatcherIPAddress(NetworkAddress):
default_port = 25060
class DispatcherManagementAddress(NetworkAddress):
default_port = 25061
class AccountingModuleList(StringList):
_valid_backends = {'database', 'radius'}
def __new__(cls, value):
proposed_backends = set(StringList.__new__(cls, value))
return list(proposed_backends & cls._valid_backends)
class DispatcherAddress(tuple):
default_port = 25060
def __new__(cls, value):
match = re.search(r"^(?P
.+?):(?P\d+)$", value)
if match:
address = str(match.group("address"))
port = int(match.group("port"))
else:
address = value
port = cls.default_port
try:
address = IPAddress(address)
is_domain = False
except ValueError:
is_domain = True
return tuple.__new__(cls, (address, port, is_domain))
class DispatcherAddressList(list):
def __init__(cls, value):
list.__init__(cls, (DispatcherAddress(dispatcher) for dispatcher in re.split(r'\s*,\s*|\s+', value)))
class PortRange(object):
"""A port range in the form start:end with start and end being even numbers in the [1024, 65536] range"""
def __init__(self, value):
self.start, self.end = [int(p) for p in value.split(':', 1)]
- allowed = xrange(1024, 65537, 2)
+ allowed = range(1024, 65537, 2)
if not (self.start in allowed and self.end in allowed and self.start < self.end):
raise ValueError("bad range: %r: ports must be even numbers in the range [1024, 65536] with start < end" % value)
def __repr__(self):
return "%s('%d:%d')" % (self.__class__.__name__, self.start, self.end)
class PositiveInteger(int):
def __new__(cls, value):
instance = int.__new__(cls, value)
if instance < 1:
raise ValueError("value must be a positive integer")
return instance
class SIPThorDomain(str):
"""A SIP Thor domain name or the keyword None"""
def __new__(cls, name):
if name is None:
return None
- elif not isinstance(name, basestring):
+ elif not isinstance(name, str):
raise TypeError("domain name must be a string, unicode or None")
if name.lower() == 'none':
return None
return name
class X509NameValidator(crypto.X509Name):
def __new__(cls, dname):
if dname.lower() == 'none':
return None
return crypto.X509Name.__new__(cls, dname)
def __init__(self, dname):
str.__init__(self)
pairs = [x.replace('\,', ',') for x in re.split(r'(?=limit), self.sessions.iteritems())]
+ obsolete = [k for k, s in filter(lambda k_s: k_s[1].expire_time and (now-k_s[1].expire_time>=limit), iter(self.sessions.items()))]
if obsolete:
[self.sessions.pop(call_id) for call_id in obsolete]
log.warning('found %d expired sessions which were not removed during the last %d hours' % (len(obsolete), round(limit / 3600.0)))
return KeepRunning
def buildProtocol(self, addr):
protocol = Factory.buildProtocol(self, addr)
protocol.ip = addr.host
protocol.logger = ProtocolLogger(name='relay {}'.format(addr.host))
protocol.logger.info('Connection established')
return protocol
def new_relay(self, relay):
old_relay = self.relays.pop(relay.ip, None)
if old_relay is not None:
relay.logger.warning('Reconnected, closing old connection')
reactor.callLater(0, old_relay.transport.connectionLost, failure.Failure(ConnectionReplaced('relay reconnected')))
self.relays[relay.ip] = relay
timer = self.cleanup_timers.pop(relay.ip, None)
if timer is not None:
timer.cancel()
defer = relay.send_command(Command('sessions'))
defer.addCallback(self._cb_purge_sessions, relay.ip)
def _cb_purge_sessions(self, result, relay_ip):
- relay_sessions = cjson.decode(result)
+ relay_sessions = json.loads(result)
relay_call_ids = [session['call_id'] for session in relay_sessions]
- for session_id, session in self.sessions.items():
+ for session_id, session in list(self.sessions.items()):
if session.expire_time is None and session.relay_ip == relay_ip and session_id not in relay_call_ids:
session.logger.warning('Relay does not have the session anymore, statistics are probably lost')
if session.dialog_id is not None:
self.dispatcher.opensips_management.end_dialog(session.dialog_id)
del self.sessions[session_id]
def send_command(self, command):
session = self.sessions.get(command.call_id, None)
if session and session.expire_time is None:
relay = session.relay_ip
if relay not in self.relays:
session.logger.error('Request {0.name!r} failed: relay no longer connected'.format(command))
raise RelayError('Request {0.name!r} failed: relay no longer connected'.format(command))
return self.relays[relay].send_command(command)
# We do not have a session for this call_id or the session is already expired
if command.name == 'update':
preferred_relay = command.parsed_headers.get('media_relay')
- try_relays = deque(protocol for protocol in self.relays.itervalues() if protocol.active and protocol.ip != preferred_relay)
+ try_relays = deque(protocol for protocol in self.relays.values() if protocol.active and protocol.ip != preferred_relay)
random.shuffle(try_relays)
if preferred_relay is not None:
protocol = self.relays.get(preferred_relay)
if protocol is not None and protocol.active:
try_relays.appendleft(protocol)
else:
log.warning('user requested media_relay %s is not available' % preferred_relay)
defer = self._try_next(try_relays, command)
defer.addCallback(self._add_session, try_relays, command)
return defer
elif command.name == 'remove' and session:
# This is the remove we received for an expired session for which we triggered dialog termination
del self.sessions[command.call_id]
return 'removed'
else:
raise RelayError('Got {0.name!r} for unknown session {0.session_id}'.format(command))
def _add_session(self, result, try_relays, command):
self.sessions[command.call_id] = RelaySession(try_relays[0], command)
return result
def _relay_error(self, failure, try_relays, command):
failure.trap(RelayError)
failed_relay = try_relays.popleft()
failed_relay.logger.warning('The {0.name!r} request failed: {1.value}'.format(command, failure))
return self._try_next(try_relays, command)
def _try_next(self, try_relays, command):
if len(try_relays) == 0:
raise RelayError('No suitable relay found')
defer = try_relays[0].send_command(command)
defer.addErrback(self._relay_error, try_relays, command)
return defer
def get_summary(self):
command = Command('summary')
- defer = DeferredList([relay.send_command(command).addErrback(self._summary_error, command, relay) for relay in self.relays.itervalues()])
+ defer = DeferredList([relay.send_command(command).addErrback(self._summary_error, command, relay) for relay in self.relays.values()])
defer.addCallback(self._got_summaries)
return defer
def _summary_error(self, failure, command, relay):
relay.logger.error('The {0.name!r} request failed: {1.value}'.format(command, failure))
- return cjson.encode(dict(status='error', ip=relay.ip))
+ return json.dumps(dict(status='error', ip=relay.ip))
def _got_summaries(self, results):
return '[%s]' % ', '.join(result for succeeded, result in results if succeeded)
def get_statistics(self):
command = Command('sessions')
- defer = DeferredList([relay.send_command(command).addErrback(self._statistics_error, command, relay) for relay in self.relays.itervalues()])
+ defer = DeferredList([relay.send_command(command).addErrback(self._statistics_error, command, relay) for relay in self.relays.values()])
defer.addCallback(self._got_statistics)
return defer
def _statistics_error(self, failure, command, relay):
relay.logger.error('The {0.name!r} request failed: {1.value}'.format(command, failure))
- return cjson.encode([])
+ return json.loads([])
def _got_statistics(self, results):
return '[%s]' % ', '.join(result[1:-1] for succeeded, result in results if succeeded and result != '[]')
def connection_lost(self, relay):
- if relay not in self.relays.itervalues():
+ if relay not in iter(self.relays.values()):
return
if relay.authenticated:
del self.relays[relay.ip]
if self.shutting_down:
if len(self.relays) == 0:
self.defer.callback(None)
else:
self.cleanup_timers[relay.ip] = reactor.callLater(DispatcherConfig.cleanup_dead_relays_after, self._do_cleanup, relay.ip)
def _do_cleanup(self, ip):
log.debug('Cleaning up after old relay at %s' % ip)
del self.cleanup_timers[ip]
- for call_id in (call_id for call_id, session in self.sessions.items() if session.relay_ip == ip):
+ for call_id in (call_id for call_id, session in list(self.sessions.items()) if session.relay_ip == ip):
del self.sessions[call_id]
def shutdown(self):
if self.shutting_down:
return
self.shutting_down = True
- for timer in self.cleanup_timers.itervalues():
+ for timer in self.cleanup_timers.values():
timer.cancel()
if len(self.relays) == 0:
retval = succeed(None)
else:
- for prot in self.relays.itervalues():
+ for prot in self.relays.values():
prot.transport.loseConnection()
self.defer = Deferred()
retval = self.defer
retval.addCallback(self._save_state)
return retval
def _save_state(self, result):
- pickle.dump(self.sessions, open(process.runtime.file('dispatcher_state'), 'w'))
+ pickle.dump(self.sessions, open(process.runtime.file('dispatcher_state'), 'wb'))
class Dispatcher(object):
def __init__(self):
self.accounting = [__import__('mediaproxy.interfaces.accounting.%s' % mod.lower(), globals(), locals(), ['']).Accounting() for mod in set(DispatcherConfig.accounting)]
self.cred = X509Credentials(cert_name='dispatcher')
self.tls_context = TLSContext(self.cred)
self.relay_factory = RelayFactory(self)
dispatcher_addr, dispatcher_port = DispatcherConfig.listen
- self.relay_listener = reactor.listenTLS(dispatcher_port, self.relay_factory, self.tls_context, interface=dispatcher_addr)
+ self.relay_listener = listenTLS(reactor, dispatcher_port, self.relay_factory, self.tls_context, interface=dispatcher_addr)
self.opensips_factory = OpenSIPSControlFactory(self)
socket_path = process.runtime.file(DispatcherConfig.socket_path)
unlink(socket_path)
self.opensips_listener = reactor.listenUNIX(socket_path, self.opensips_factory)
self.opensips_management = opensips.ManagementInterface()
self.management_factory = ManagementControlFactory(self)
management_addr, management_port = DispatcherConfig.listen_management
if DispatcherConfig.management_use_tls:
- self.management_listener = reactor.listenTLS(management_port, self.management_factory, self.tls_context, interface=management_addr)
+ self.management_listener = listenTLS(reactor, management_port, self.management_factory, self.tls_context, interface=management_addr)
else:
- self.management_listener = reactor.listenTCP(management_port, self.management_factory, interface=management_addr)
+ self.management_listener = reactor.listenTCP( management_port, self.management_factory, interface=management_addr)
def run(self):
log.debug('Using {0.__class__.__name__}'.format(reactor))
process.signals.add_handler(signal.SIGHUP, self._handle_signal)
process.signals.add_handler(signal.SIGINT, self._handle_signal)
process.signals.add_handler(signal.SIGTERM, self._handle_signal)
process.signals.add_handler(signal.SIGUSR1, self._handle_signal)
for accounting_module in self.accounting:
accounting_module.start()
reactor.run(installSignalHandlers=False)
def stop(self):
reactor.callFromThread(self._shutdown)
def send_command(self, command):
return maybeDeferred(self.relay_factory.send_command, command)
def update_statistics(self, session, stats):
session.logger.info('statistics: {}'.format(stats))
if stats['start_time'] is not None:
for accounting in self.accounting:
try:
accounting.do_accounting(stats)
- except Exception, e:
+ except Exception as e:
log.exception('An unhandled error occurred while doing accounting: %s' % e)
def _handle_signal(self, signum, frame):
if signum == signal.SIGUSR1:
# toggle debugging
if log.level.current != log.level.DEBUG:
log.level.current = log.level.DEBUG
log.info('Switched logging level to DEBUG')
else:
log.info('Switched logging level to {}'.format(DispatcherConfig.log_level))
log.level.current = DispatcherConfig.log_level
else:
# terminate program
signal_map = {signal.SIGTERM: 'Terminated', signal.SIGINT: 'Interrupted', signal.SIGHUP: 'Hangup'}
log.info(signal_map.get(signum, 'Received signal {}, exiting.'.format(signum)))
self.stop()
def _shutdown(self):
defer = DeferredList([result for result in [self.opensips_listener.stopListening(), self.management_listener.stopListening(), self.relay_listener.stopListening()] if result is not None])
defer.addCallback(lambda x: self.opensips_factory.shutdown())
defer.addCallback(lambda x: self.management_factory.shutdown())
defer.addCallback(lambda x: self.relay_factory.shutdown())
defer.addCallback(lambda x: self._stop())
def _stop(self):
for act in self.accounting:
act.stop()
reactor.stop()
diff --git a/mediaproxy/headers.py b/mediaproxy/headers.py
index c169331..15ba8ed 100644
--- a/mediaproxy/headers.py
+++ b/mediaproxy/headers.py
@@ -1,105 +1,105 @@
"""Header encoding and decoding rules for communication between the dispatcher and relay components"""
class EncodingError(Exception):
pass
class DecodingError(Exception):
pass
class MediaProxyHeaders(object):
@classmethod
def encode(cls, name, value):
func_name = "encode_%s" % name
if hasattr(cls, func_name):
return getattr(cls, func_name)(value)
else:
return value
@classmethod
def decode(cls, name, value):
func_name = "decode_%s" % name
if hasattr(cls, func_name):
return getattr(cls, func_name)(value)
else:
return value
@staticmethod
def encode_cseq(value):
return str(value)
@staticmethod
def decode_cseq(value):
try:
return int(value)
except ValueError:
raise DecodingError("Not an integer: %s" % value)
@staticmethod
def encode_type(value):
if value not in ["request", "reply"]:
raise EncodingError('"type" header should be either "request" or "reply"')
return value
@staticmethod
def decode_type(value):
if value not in ["request", "reply"]:
raise DecodingError('"type" header should be either "request" or "reply"')
return value
@staticmethod
def encode_media(value):
try:
- return ','.join(':'.join([type, ip, str(port), direction] + ['%s=%s' % (k, v) for k, v in parameters.iteritems()]) for type, ip, port, direction, parameters in value)
+ return ','.join(':'.join([type, ip, str(port), direction] + ['%s=%s' % (k, v) for k, v in parameters.items()]) for type, ip, port, direction, parameters in value)
except:
raise EncodingError("Ill-formatted media information")
@staticmethod
def decode_media(value):
try:
streams = []
for stream_data in (data for data in value.split(",") if data):
stream_data = stream_data.split(":")
type, ip, port, direction = stream_data[:4]
parameters = dict(param.split("=") for param in stream_data[4:] if param)
streams.append((type, ip, int(port), direction, parameters))
return streams
except:
raise DecodingError("Ill-formatted media header")
class CodingDict(dict):
def __init__(self, *args, **kwargs):
if not args and not kwargs:
it = []
elif kwargs:
- it = kwargs.iteritems()
+ it = iter(kwargs.items())
elif isinstance(args[0], dict):
- it = args[0].iteritems()
+ it = iter(args[0].items())
else:
try:
it = iter(args[0])
except:
dict.__init__(self, *args, **kwargs)
return
dict.__init__(self)
for key, value in it:
self.__setitem__(key, value)
class EncodingDict(CodingDict):
def __setitem__(self, key, value):
encoded_value = MediaProxyHeaders.encode(key, value)
dict.__setitem__(self, key, encoded_value)
class DecodingDict(CodingDict):
def __setitem__(self, key, value):
decoded_value = MediaProxyHeaders.decode(key, value)
dict.__setitem__(self, key, decoded_value)
diff --git a/mediaproxy/interfaces/accounting/database.py b/mediaproxy/interfaces/accounting/database.py
index 4c5c3ad..6449719 100644
--- a/mediaproxy/interfaces/accounting/database.py
+++ b/mediaproxy/interfaces/accounting/database.py
@@ -1,86 +1,86 @@
"""Implementation of database accounting"""
-import cjson
+import json
from application import log
from application.python.queue import EventQueue
from sqlobject import SQLObject, connectionForURI, sqlhub
from sqlobject import StringCol, BLOBCol, DatabaseIndex
from sqlobject.dberrors import DatabaseError, ProgrammingError, OperationalError
from mediaproxy.configuration import DatabaseConfig
if not DatabaseConfig.dburi:
raise RuntimeError('Database accounting is enabled, but the database URI is not specified in config.ini')
connection = connectionForURI(DatabaseConfig.dburi)
sqlhub.processConnection = connection
class MediaSessions(SQLObject):
class sqlmeta:
table = DatabaseConfig.sessions_table
createSQL = {'mysql': 'ALTER TABLE %s ENGINE MyISAM' % DatabaseConfig.sessions_table}
cacheValues = False
call_id = StringCol(length=255, dbName=DatabaseConfig.callid_column, notNone=True)
from_tag = StringCol(length=64, dbName=DatabaseConfig.fromtag_column, notNone=True)
to_tag = StringCol(length=64, dbName=DatabaseConfig.totag_column)
info = BLOBCol(length=2**24-1, dbName=DatabaseConfig.info_column) # 2**24-1 makes it a mediumblob in mysql, that can hold 16 million bytes
# Indexes
callid_idx = DatabaseIndex('call_id', 'from_tag', 'to_tag', unique=True)
try:
MediaSessions.createTable(ifNotExists=True)
except OperationalError as e:
log.error("cannot create the `%s' table: %s" % (DatabaseConfig.sessions_table, e))
log.info("please make sure that the `%s' user has the CREATE and ALTER rights on the `%s' database" % (connection.user, connection.db))
log.info('then restart the dispatcher, or you can create the table yourself using the following definition:')
log.info('----------------- >8 -----------------')
sql, constraints = MediaSessions.createTableSQL()
statements = ';\n'.join([sql] + constraints) + ';'
log.info(statements)
log.info('----------------- >8 -----------------')
# raise RuntimeError(str(e))
class Accounting(object):
def __init__(self):
self.handler = DatabaseAccounting()
def start(self):
self.handler.start()
def do_accounting(self, stats):
self.handler.put(stats)
def stop(self):
self.handler.stop()
self.handler.join()
class DatabaseAccounting(EventQueue):
def __init__(self):
EventQueue.__init__(self, self.do_accounting)
def do_accounting(self, stats):
sqlrepr = connection.sqlrepr
names = ', '.join([DatabaseConfig.callid_column, DatabaseConfig.fromtag_column, DatabaseConfig.totag_column, DatabaseConfig.info_column])
- values = ', '.join((sqlrepr(v) for v in [stats['call_id'], stats['from_tag'], stats['to_tag'], cjson.encode(stats)]))
+ values = ', '.join((sqlrepr(v) for v in [stats['call_id'], stats['from_tag'], stats['to_tag'], json.dumps(stats)]))
q = 'INSERT INTO %s (%s) VALUES (%s)' % (DatabaseConfig.sessions_table, names, values)
try:
try:
connection.query(q)
- except ProgrammingError, e:
+ except ProgrammingError as e:
try:
MediaSessions.createTable(ifNotExists=True)
except OperationalError:
raise e
else:
connection.query(q)
except DatabaseError as e:
log.error('failed to insert record into database: %s' % e)
diff --git a/mediaproxy/interfaces/accounting/radius.py b/mediaproxy/interfaces/accounting/radius.py
index 04c68aa..8a35239 100644
--- a/mediaproxy/interfaces/accounting/radius.py
+++ b/mediaproxy/interfaces/accounting/radius.py
@@ -1,128 +1,131 @@
"""Implementation of RADIUS accounting"""
from application import log
from application.process import process
from application.python.queue import EventQueue
import pyrad.client
import pyrad.dictionary
from mediaproxy.configuration import RadiusConfig
try:
from pyrad.dictfile import DictFile
except ImportError:
# helper class to make pyrad support the $INCLUDE statement in dictionary files
class RadiusDictionaryFile(object):
def __init__(self, base_file_name):
self.file_names = [base_file_name]
self.fd_stack = [open(base_file_name)]
def readlines(self):
while True:
line = self.fd_stack[-1].readline()
if line:
if line.startswith('$INCLUDE'):
file_name = line.rstrip('\n').split(None, 1)[1]
if file_name not in self.file_names:
self.file_names.append(file_name)
self.fd_stack.append(open(file_name))
continue
else:
yield line
else:
self.fd_stack.pop()
if len(self.fd_stack) == 0:
return
else:
del DictFile
class RadiusDictionaryFile(str):
pass
class Accounting(object):
def __init__(self):
self.handler = RadiusAccounting()
def start(self):
self.handler.start()
def do_accounting(self, stats):
self.handler.put(stats)
def stop(self):
self.handler.stop()
self.handler.join()
class RadiusAccounting(EventQueue, pyrad.client.Client):
def __init__(self):
main_config_file = process.configuration.file(RadiusConfig.config_file)
if main_config_file is None:
raise RuntimeError('Cannot find the radius configuration file: %r' % RadiusConfig.config_file)
try:
config = dict(line.rstrip('\n').split(None, 1) for line in open(main_config_file) if len(line.split(None, 1)) == 2 and not line.startswith('#'))
secrets = dict(line.rstrip('\n').split(None, 1) for line in open(config['servers']) if len(line.split(None, 1)) == 2 and not line.startswith('#'))
server = config['acctserver']
try:
server, acctport = server.split(':')
acctport = int(acctport)
except ValueError:
log.info('Could not load additional RADIUS dictionary file: %r' % RadiusConfig.additional_dictionary)
acctport = 1813
log.info('Using RADIUS server at %s:%d' % (server, acctport))
secret = secrets[server]
log.info("Using RADIUS dictionary file %s" % config['dictionary'])
dicts = [RadiusDictionaryFile(config['dictionary'])]
if RadiusConfig.additional_dictionary:
additional_dictionary = process.configuration.file(RadiusConfig.additional_dictionary)
if additional_dictionary:
log.info("Using additional RADIUS dictionary file %s" % RadiusConfig.additional_dictionary)
dicts.append(RadiusDictionaryFile(additional_dictionary))
else:
log.warning('Could not load additional RADIUS dictionary file: %r' % RadiusConfig.additional_dictionary)
raddict = pyrad.dictionary.Dictionary(*dicts)
timeout = int(config['radius_timeout'])
retries = int(config['radius_retries'])
except Exception:
log.critical('cannot read the RADIUS configuration file %s' % RadiusConfig.config_file)
raise
pyrad.client.Client.__init__(self, server, 1812, acctport, 3799, secret, raddict)
self.timeout = timeout
self.retries = retries
if 'bindaddr' in config and config['bindaddr'] != '*':
self.bind((config['bindaddr'], 0))
EventQueue.__init__(self, self.do_accounting)
def do_accounting(self, stats):
attrs = {}
attrs['Acct-Status-Type'] = 'Update'
attrs['User-Name'] = 'mediaproxy@default'
attrs['Acct-Session-Id'] = stats['call_id']
attrs['Acct-Session-Time'] = stats['duration']
attrs['Acct-Input-Octets'] = sum(stream_stats['caller_bytes'] for stream_stats in stats['streams'])
attrs['Acct-Output-Octets'] = sum(stream_stats['callee_bytes'] for stream_stats in stats['streams'])
attrs['Sip-From-Tag'] = stats['from_tag']
attrs['Sip-To-Tag'] = stats['to_tag'] or ''
attrs['NAS-IP-Address'] = stats['streams'][0]['caller_local'].split(':')[0]
attrs['Sip-User-Agents'] = (stats['caller_ua'] + '+' + stats['callee_ua'])[:253]
attrs['Sip-Applications'] = ', '.join(sorted(set(stream['media_type'] for stream in stats['streams'] if stream['start_time'] != stream['end_time'])))[:253]
attrs['Media-Codecs'] = ', '.join(stream['caller_codec'] for stream in stats['streams'])[:253]
if stats['timed_out'] and not stats.get('all_streams_ice', False):
attrs['Media-Info'] = 'timeout'
elif stats.get('all_streams_ice', False):
attrs['Media-Info'] = 'ICE session'
else:
attrs['Media-Info'] = ''
for stream in stats['streams']:
if stream['post_dial_delay'] is not None:
attrs['Acct-Delay-Time'] = int(stream['post_dial_delay'])
break
+ if isinstance(self.secret, str):
+ scr_bn = self.secret.encode('utf-8')
+ self.secret = scr_bn
try:
self.SendPacket(self.CreateAcctPacket(**attrs))
- except Exception, e:
+ except Exception as e:
log.error('Failed to send radius accounting record: %s' % e)
diff --git a/mediaproxy/interfaces/opensips.py b/mediaproxy/interfaces/opensips.py
index ce2f3cf..d0e1de4 100644
--- a/mediaproxy/interfaces/opensips.py
+++ b/mediaproxy/interfaces/opensips.py
@@ -1,240 +1,236 @@
import json
import socket
-import urlparse
+import urllib.parse
from abc import ABCMeta, abstractmethod, abstractproperty
from application import log
from application.python.types import Singleton
from application.process import process
from application.system import unlink
from random import getrandbits
from twisted.internet import reactor, defer
from twisted.internet.protocol import DatagramProtocol
from twisted.python.failure import Failure
from mediaproxy.configuration import OpenSIPSConfig
class Error(Exception):
pass
class TimeoutError(Error):
pass
class OpenSIPSError(Error):
pass
class NegativeReplyError(OpenSIPSError):
def __init__(self, code, message):
super(NegativeReplyError, self).__init__(code, message)
self.code = code
self.message = message
def __repr__(self):
return '{0.__class__.__name__}({0.code!r}, {0.message!r})'.format(self)
def __str__(self):
return '[{0.code}] {0.message}'.format(self)
-class Request(object):
- __metaclass__ = ABCMeta
-
+class Request(object, metaclass=ABCMeta):
method = abstractproperty()
@abstractmethod
def __init__(self, *args):
self.id = '{:x}'.format(getrandbits(32))
self.args = list(args)
self.deferred = defer.Deferred()
@property
def __data__(self):
return dict(jsonrpc='2.0', id=self.id, method=self.method, params=self.args)
@abstractmethod
def process_response(self, response):
raise NotImplementedError
# noinspection PyAbstractClass
class BooleanRequest(Request):
"""A request that returns True if successful, False otherwise"""
def process_response(self, response):
return not isinstance(response, Failure)
class AddressReload(BooleanRequest):
method = 'address_reload'
def __init__(self):
super(AddressReload, self).__init__()
class DomainReload(BooleanRequest):
method = 'domain_reload'
def __init__(self):
super(DomainReload, self).__init__()
class EndDialog(BooleanRequest):
method = 'dlg_end_dlg'
def __init__(self, dialog_id):
super(EndDialog, self).__init__(dialog_id)
class RefreshWatchers(BooleanRequest):
method = 'refresh_watchers'
def __init__(self, account, refresh_type):
super(RefreshWatchers, self).__init__('sip:{}'.format(account), 'presence', refresh_type)
class UpdateSubscriptions(BooleanRequest):
method = 'rls_update_subscriptions'
def __init__(self, account):
super(UpdateSubscriptions, self).__init__('sip:{}'.format(account))
class GetOnlineDevices(Request):
method = 'ul_show_contact'
def __init__(self, account):
super(GetOnlineDevices, self).__init__(OpenSIPSConfig.location_table, account)
def process_response(self, response):
if isinstance(response, Failure):
if response.type is NegativeReplyError and response.value.code == 404:
return []
return response
- return [ContactData(contact) for contact in response[u'Contacts']]
+ return [ContactData(contact) for contact in response['Contacts']]
class ContactData(dict):
- __fields__ = {u'contact', u'expires', u'received', u'user_agent'}
+ __fields__ = {'contact', 'expires', 'received', 'user_agent'}
def __init__(self, data):
- super(ContactData, self).__init__({key: value for key, value in ((key.lower().replace(u'-', u'_'), value) for key, value in data.iteritems()) if key in self.__fields__})
- self.setdefault(u'user_agent', None)
- if u'received' in self:
- parsed_received = urlparse.parse_qs(self[u'received'])
- if u'target' in parsed_received:
- self[u'NAT_contact'] = parsed_received[u'target'][0]
+ super(ContactData, self).__init__({key: value for key, value in ((key.lower().replace('-', '_'), value) for key, value in data.items()) if key in self.__fields__})
+ self.setdefault('user_agent', None)
+ if 'received' in self:
+ parsed_received = urllib.parse.parse_qs(self['received'])
+ if 'target' in parsed_received:
+ self['NAT_contact'] = parsed_received['target'][0]
else:
- self[u'NAT_contact'] = self[u'received']
- del self[u'received']
+ self['NAT_contact'] = self['received']
+ del self['received']
else:
- self[u'NAT_contact'] = self[u'contact']
+ self['NAT_contact'] = self['contact']
class UNIXSocketProtocol(DatagramProtocol):
noisy = False
def datagramReceived(self, data, address):
log.debug('Got MI response: {}'.format(data))
try:
response = json.loads(data)
except ValueError:
code, _, message = data.partition(' ')
try:
code = int(code)
except ValueError:
log.error('MI response from OpenSIPS cannot be parsed (neither JSON nor status reply)')
return
# we got one of the 'code message' type of replies. This means either parsing error or internal error in OpenSIPS.
# if we only have one request pending, we can associate the response with it, otherwise is impossible to tell to
# which request the response corresponds. The failed request will fail with timeout later.
if len(self.transport.requests) == 1:
_, request = self.transport.requests.popitem()
request.deferred.errback(Failure(NegativeReplyError(code, message)))
log.error('MI request {.method} failed with: {} {}'.format(request, code, message))
else:
log.error('Got MI status reply from OpenSIPS that cannot be associated with a request: {!r}'.format(data))
else:
try:
request_id = response['id']
except KeyError:
log.error('MI JSON response from OpenSIPS lacks id field')
return
if request_id not in self.transport.requests:
log.error('MI JSON response from OpenSIPS has unknown id: {!r}'.format(request_id))
return
request = self.transport.requests.pop(request_id)
if 'result' in response:
request.deferred.callback(response['result'])
elif 'error' in response:
log.error('MI request {0.method} failed with: {1[error][code]} {1[error][message]}'.format(request, response))
request.deferred.errback(Failure(NegativeReplyError(response['error']['code'], response['error']['message'])))
else:
log.error('Invalid MI JSON response from OpenSIPS')
request.deferred.errback(Failure(OpenSIPSError('Invalid MI JSON response from OpenSIPS')))
class UNIXSocketConnection(object):
timeout = 3
def __init__(self):
socket_path = process.runtime.file('opensips.sock')
unlink(socket_path)
self.path = socket_path
self.transport = reactor.listenUNIXDatagram(self.path, UNIXSocketProtocol())
self.transport.requests = {}
reactor.addSystemEventTrigger('during', 'shutdown', self.close)
def close(self):
- for request in self.transport.requests.values():
+ for request in list(self.transport.requests.values()):
if not request.deferred.called:
request.deferred.errback(Error('shutting down'))
self.transport.requests.clear()
self.transport.stopListening()
unlink(self.path)
def send(self, request):
try:
self.transport.write(json.dumps(request.__data__), OpenSIPSConfig.socket_path)
except socket.error as e:
log.error("cannot write request to %s: %s" % (OpenSIPSConfig.socket_path, e[1]))
request.deferred.errback(Failure(Error("Cannot send MI request %s to OpenSIPS" % request.method)))
else:
self.transport.requests[request.id] = request
request.deferred.addBoth(request.process_response)
reactor.callLater(self.timeout, self._did_timeout, request)
log.debug('Send MI request: {}'.format(request.__data__))
return request.deferred
def _did_timeout(self, request):
if not request.deferred.called:
request.deferred.errback(Failure(TimeoutError("OpenSIPS command did timeout")))
self.transport.requests.pop(request.id)
-class ManagementInterface(object):
- __metaclass__ = Singleton
-
+class ManagementInterface(object, metaclass=Singleton):
def __init__(self):
self.connection = UNIXSocketConnection()
def reload_domains(self):
return self.connection.send(DomainReload())
def reload_addresses(self):
return self.connection.send(AddressReload())
def end_dialog(self, dialog_id):
return self.connection.send(EndDialog(dialog_id))
def get_online_devices(self, account):
return self.connection.send(GetOnlineDevices(account))
def refresh_watchers(self, account, refresh_type):
return self.connection.send(RefreshWatchers(account, refresh_type))
def update_subscriptions(self, account):
return self.connection.send(UpdateSubscriptions(account))
diff --git a/mediaproxy/mediacontrol.py b/mediaproxy/mediacontrol.py
index 3a6f1e2..ed487ee 100644
--- a/mediaproxy/mediacontrol.py
+++ b/mediaproxy/mediacontrol.py
@@ -1,827 +1,829 @@
import hashlib
import struct
from application import log
from base64 import b64encode as base64_encode
from itertools import chain
from collections import deque
from operator import attrgetter
from time import time
from twisted.internet import reactor
from twisted.internet.interfaces import IReadDescriptor
from twisted.internet.protocol import DatagramProtocol
from twisted.internet.error import CannotListenError
from twisted.python.log import Logger
-from zope.interface import implements
+from zope.interface import implementer
from mediaproxy.configuration import RelayConfig
from mediaproxy.interfaces.system import _conntrack
from mediaproxy.iputils import is_routable_ip
from mediaproxy.scheduler import RecurrentCall, KeepRunning
UDP_TIMEOUT_FILE = '/proc/sys/net/ipv4/netfilter/ip_conntrack_udp_timeout_stream'
rtp_payloads = {
0: 'G711u', 1: '1016', 2: 'G721', 3: 'GSM', 4: 'G723', 5: 'DVI4', 6: 'DVI4',
7: 'LPC', 8: 'G711a', 9: 'G722', 10: 'L16', 11: 'L16', 14: 'MPA', 15: 'G728',
18: 'G729', 25: 'CelB', 26: 'JPEG', 28: 'nv', 31: 'H261', 32: 'MPV', 33: 'MP2T',
34: 'H263'
}
class RelayPortsExhaustedError(Exception):
pass
if RelayConfig.relay_ip is None:
raise RuntimeError('Could not determine default host IP; either add default route or specify relay IP manually')
class SessionLogger(log.ContextualLogger):
def __init__(self, session):
super(SessionLogger, self).__init__(logger=log.get_logger()) # use the main logger as backend
self.session_id = session.call_id
def apply_context(self, message):
return '[session {0.session_id}] {1}'.format(self, message) if message != '' else ''
class Address(object):
"""Representation of an endpoint address"""
def __init__(self, host, port, in_use=True, got_rtp=False):
self.host = host
self.port = port
- self.in_use = self.__nonzero__() and in_use
+ self.in_use = self.__bool__() and in_use
self.got_rtp = got_rtp
def __len__(self):
return 2
- def __nonzero__(self):
+ def __bool__(self):
return None not in (self.host, self.port)
def __getitem__(self, index):
return (self.host, self.port)[index]
def __contains__(self, item):
return item in (self.host, self.port)
def __iter__(self):
yield self.host
yield self.port
def __str__(self):
- return self.__nonzero__() and ('%s:%d' % (self.host, self.port)) or 'Unknown'
+ return self.__bool__() and ('%s:%d' % (self.host, self.port)) or 'Unknown'
def __repr__(self):
return '%s(%r, %r, in_use=%r, got_rtp=%r)' % (self.__class__.__name__, self.host, self.port, self.in_use, self.got_rtp)
def forget(self):
self.host, self.port, self.in_use, self.got_rtp = None, None, False, False
@property
def unknown(self):
return None in (self.host, self.port)
@property
def obsolete(self):
- return self.__nonzero__() and not self.in_use
+ return self.__bool__() and not self.in_use
class Counters(dict):
def __add__(self, other):
n = Counters(self)
- for k, v in other.iteritems():
+ for k, v in other.items():
n[k] += v
return n
def __iadd__(self, other):
- for k, v in other.iteritems():
+ for k, v in other.items():
self[k] += v
return self
@property
def caller_bytes(self):
return self['caller_bytes']
@property
def callee_bytes(self):
return self['callee_bytes']
@property
def caller_packets(self):
return self['caller_packets']
@property
def callee_packets(self):
return self['callee_packets']
@property
def relayed_bytes(self):
return self['caller_bytes'] + self['callee_bytes']
@property
def relayed_packets(self):
return self['caller_packets'] + self['callee_packets']
class StreamListenerProtocol(DatagramProtocol):
noisy = False
def __init__(self):
self.cb_func = None
self.sdp = None
self.send_packet_count = 0
self.stun_queue = []
- def datagramReceived(self, data, (host, port)):
+ def datagramReceived(self, data, addr):
+ (host, port) = addr
if self.cb_func is not None:
self.cb_func(host, port, data)
def set_remote_sdp(self, ip, port):
if is_routable_ip(ip):
self.sdp = ip, port
else:
self.sdp = None
def send(self, data, is_stun, ip=None, port=None):
if is_stun:
self.stun_queue.append(data)
if ip is None or port is None:
# this means that we have not received any packets from this host yet,
# so we have not learnt its address
if self.sdp is None:
# we can't do anything if we haven't received the SDP IP yet or
# it was in a private range
return
ip, port = self.sdp
# we learnt the IP, empty the STUN packets queue
if self.stun_queue:
for data in self.stun_queue:
self.transport.write(data, (ip, port))
self.stun_queue = []
if not is_stun:
if not self.send_packet_count % RelayConfig.userspace_transmit_every:
self.transport.write(data, (ip, port))
self.send_packet_count += 1
def _stun_test(data):
# Check if data is a STUN request and if it's a binding request
if len(data) < 20:
return False, False
msg_type, msg_len, magic = struct.unpack('!HHI', data[:8])
if msg_type & 0xc == 0 and magic == 0x2112A442:
if msg_type == 0x0001:
return True, True
else:
return True, False
else:
return False, False
class MediaSubParty(object):
def __init__(self, substream, listener):
self.substream = substream
self.logger = substream.logger
self.listener = listener
self.listener.protocol.cb_func = self.got_data
self.remote = Address(None, None)
host = self.listener.protocol.transport.getHost()
self.local = Address(host.host, host.port)
self.timer = None
self.codec = 'Unknown'
self.got_stun_probing = False
self.reset()
def reset(self):
if self.timer and self.timer.active():
self.timer.cancel()
self.timer = reactor.callLater(RelayConfig.stream_timeout, self.substream.expired, 'no-traffic timeout', RelayConfig.stream_timeout)
self.remote.in_use = False # keep remote address around but mark it as obsolete
self.remote.got_rtp = False
self.got_stun_probing = False
self.listener.protocol.send_packet_count = 0
def before_hold(self):
if self.timer and self.timer.active():
self.timer.cancel()
self.timer = reactor.callLater(RelayConfig.on_hold_timeout, self.substream.expired, 'on hold timeout', RelayConfig.on_hold_timeout)
def after_hold(self):
if self.timer and self.timer.active():
self.timer.cancel()
if not self.remote.in_use:
self.timer = reactor.callLater(RelayConfig.stream_timeout, self.substream.expired, 'no-traffic timeout', RelayConfig.stream_timeout)
def got_data(self, host, port, data):
if (host, port) == tuple(self.remote):
if self.remote.obsolete:
# the received packet matches the previously used IP/port,
# which has been made obsolete, so ignore it
return
else:
if self.remote.in_use:
# the received packet is different than the recorded IP/port,
# so we will discard it
return
# we have learnt the remote IP/port
self.remote.host, self.remote.port = host, port
self.remote.in_use = True
self.logger.info('discovered peer: %s' % self.substream.stream)
is_stun, is_binding_request = _stun_test(data)
self.substream.send_data(self, data, is_stun)
if not self.remote.got_rtp and not is_stun:
# This is the first RTP packet received
self.remote.got_rtp = True
if self.timer:
if self.timer.active():
self.timer.cancel()
self.timer = None
if self.codec == 'Unknown' and self.substream is self.substream.stream.rtp:
try:
- pt = ord(data[1]) & 127
+ pt = data[1] & 127
except IndexError:
pass
else:
if pt > 95:
self.codec = 'Dynamic(%d)' % pt
elif pt in rtp_payloads:
self.codec = rtp_payloads[pt]
else:
self.codec = 'Unknown(%d)' % pt
self.substream.check_create_conntrack()
if is_binding_request:
self.got_stun_probing = True
def cleanup(self):
if self.timer and self.timer.active():
self.timer.cancel()
self.timer = None
self.listener.protocol.cb_func = None
self.substream = None
class MediaSubStream(object):
def __init__(self, stream, listener_caller, listener_callee):
self.stream = stream
self.logger = stream.logger
self.forwarding_rule = None
self.caller = MediaSubParty(self, listener_caller)
self.callee = MediaSubParty(self, listener_callee)
self._counters = Counters(caller_bytes=0, callee_bytes=0, caller_packets=0, callee_packets=0)
@property
def counters(self):
"""Accumulated counters from all the forwarding rules the stream had"""
if self.forwarding_rule is None:
return self._counters
else:
try:
return self._counters + self.forwarding_rule.counters
except _conntrack.Error:
return self._counters
def _stop_relaying(self):
if self.forwarding_rule is not None:
try:
self._counters += self.forwarding_rule.counters
except _conntrack.Error:
pass
self.forwarding_rule = None
def reset(self, party):
if party == 'caller':
self.caller.reset()
else:
self.callee.reset()
self._stop_relaying()
def check_create_conntrack(self):
if self.stream.first_media_time is None:
self.stream.first_media_time = time()
if self.caller.remote.in_use and self.caller.remote.got_rtp and self.callee.remote.in_use and self.callee.remote.got_rtp:
self.forwarding_rule = _conntrack.ForwardingRule(self.caller.remote, self.caller.local, self.callee.remote, self.callee.local, self.stream.session.mark)
self.forwarding_rule.expired_func = self.conntrack_expired
def send_data(self, source, data, is_stun):
if source is self.caller:
dest = self.callee
else:
dest = self.caller
if dest.remote:
# if we have already learnt the remote address of the destination, use that
ip, port = dest.remote.host, dest.remote.port
dest.listener.protocol.send(data, is_stun, ip, port)
else:
# otherwise use the IP/port specified in the SDP, if public
dest.listener.protocol.send(data, is_stun)
def conntrack_expired(self):
try:
timeout_wait = int(open(UDP_TIMEOUT_FILE).read())
except:
timeout_wait = 0
self.expired('conntrack timeout', timeout_wait)
def expired(self, reason, timeout_wait):
self._stop_relaying()
self.stream.substream_expired(self, reason, timeout_wait)
def cleanup(self):
self.caller.cleanup()
self.callee.cleanup()
self._stop_relaying()
self.stream = None
class MediaParty(object):
def __init__(self, stream):
self.manager = stream.session.manager
self.logger = stream.logger
self._remote_sdp = None
self.is_on_hold = False
self.uses_ice = False
while True:
self.listener_rtp = None
self.ports = port_rtp, port_rtcp = self.manager.get_ports()
try:
self.listener_rtp = reactor.listenUDP(port_rtp, StreamListenerProtocol(), interface=RelayConfig.relay_ip)
self.listener_rtcp = reactor.listenUDP(port_rtcp, StreamListenerProtocol(), interface=RelayConfig.relay_ip)
except CannotListenError:
if self.listener_rtp is not None:
self.listener_rtp.stopListening()
self.manager.set_bad_ports(self.ports)
self.logger.warning('Cannot use port pair %d/%d' % self.ports)
else:
break
def _get_remote_sdp(self):
return self._remote_sdp
- def _set_remote_sdp(self, (ip, port)):
+ def _set_remote_sdp(self, addr):
+ (ip, port) = addr
self._remote_sdp = ip, port
self.listener_rtp.protocol.set_remote_sdp(ip, port)
remote_sdp = property(_get_remote_sdp, _set_remote_sdp)
def cleanup(self):
self.listener_rtp.stopListening()
self.listener_rtcp.stopListening()
self.manager.free_ports(self.ports)
self.manager = None
class MediaStream(object):
def __init__(self, session, media_type, media_ip, media_port, direction, media_parameters, initiating_party):
self.is_alive = True
self.session = session # type: Session
self.logger = session.logger
self.media_type = media_type
self.caller = MediaParty(self)
self.callee = MediaParty(self)
self.rtp = MediaSubStream(self, self.caller.listener_rtp, self.callee.listener_rtp)
self.rtcp = MediaSubStream(self, self.caller.listener_rtcp, self.callee.listener_rtcp)
getattr(self, initiating_party).remote_sdp = (media_ip, media_port)
getattr(self, initiating_party).uses_ice = (media_parameters.get('ice', 'no') == 'yes')
self.check_hold(initiating_party, direction, media_ip)
self.create_time = time()
self.first_media_time = None
self.start_time = None
self.end_time = None
self.status = 'active'
self.timeout_wait = 0
def __str__(self):
if self.caller.remote_sdp is None:
src = 'Unknown'
else:
src = '%s:%d' % self.caller.remote_sdp
if self.caller.is_on_hold:
src += ' ON HOLD'
if self.caller.uses_ice:
src += ' (ICE)'
if self.callee.remote_sdp is None:
dst = 'Unknown'
else:
dst = '%s:%d' % self.callee.remote_sdp
if self.callee.is_on_hold:
dst += ' ON HOLD'
if self.callee.uses_ice:
dst += ' (ICE)'
rtp = self.rtp
rtcp = self.rtcp
return '(%s) %s (RTP: %s, RTCP: %s) <-> %s <-> %s <-> %s (RTP: %s, RTCP: %s)' % (
self.media_type, src, rtp.caller.remote, rtcp.caller.remote, rtp.caller.local, rtp.callee.local, dst, rtp.callee.remote, rtcp.callee.remote)
@property
def counters(self):
return self.rtp.counters + self.rtcp.counters
@property
def is_on_hold(self):
return self.caller.is_on_hold or self.callee.is_on_hold
def check_hold(self, party, direction, ip):
previous_hold = self.is_on_hold
party = getattr(self, party)
if direction == 'sendonly' or direction == 'inactive':
party.is_on_hold = True
elif ip == '0.0.0.0':
party.is_on_hold = True
else:
party.is_on_hold = False
if previous_hold and not self.is_on_hold:
for substream in [self.rtp, self.rtcp]:
for subparty in [substream.caller, substream.callee]:
self.status = 'active'
subparty.after_hold()
if not previous_hold and self.is_on_hold:
for substream in [self.rtp, self.rtcp]:
for subparty in [substream.caller, substream.callee]:
self.status = 'on hold'
subparty.before_hold()
def reset(self, party, media_ip, media_port):
self.rtp.reset(party)
self.rtcp.reset(party)
getattr(self, party).remote_sdp = (media_ip, media_port)
def substream_expired(self, substream, reason, timeout_wait):
if substream is self.rtp and self.caller.uses_ice and self.callee.uses_ice:
reason = 'unselected ICE candidate'
self.logger.info('RTP stream expired: {}'.format(reason))
if not substream.caller.got_stun_probing and not substream.callee.got_stun_probing:
self.logger.info('unselected ICE candidate, but no STUN was received')
if substream is self.rtcp:
# Forget about the remote addresses, this will cause any
# re-occurrence of the same traffic to be forwarded again
substream.caller.remote.forget()
substream.caller.listener.protocol.send_packet_count = 0
substream.callee.remote.forget()
substream.callee.listener.protocol.send_packet_count = 0
else:
session = self.session
self.cleanup(reason)
self.timeout_wait = timeout_wait
session.stream_expired(self)
def cleanup(self, status='closed'):
if self.is_alive:
self.is_alive = False
self.status = status
self.caller.cleanup()
self.callee.cleanup()
self.rtp.cleanup()
self.rtcp.cleanup()
self.session = None
self.end_time = time()
class Session(object):
def __init__(self, manager, dispatcher, call_id, from_tag, from_uri, to_tag, to_uri, cseq, user_agent, media_list, is_downstream, is_caller_cseq, mark=0):
self.manager = manager
self.dispatcher = dispatcher
- self.session_id = base64_encode(hashlib.md5(call_id).digest()).rstrip('=')
+ self.session_id = base64_encode(hashlib.md5(call_id.encode()).digest()).rstrip(b'=')
self.call_id = call_id
self.from_tag = from_tag
self.to_tag = None
self.mark = mark
self.from_uri = from_uri
self.to_uri = to_uri
self.caller_ua = None
self.callee_ua = None
self.cseq = None
self.previous_cseq = None
self.streams = {}
self.start_time = None
self.end_time = None
self.logger = SessionLogger(self)
self.logger.info('created: from-tag {0.from_tag})'.format(self))
self.update_media(cseq, to_tag, user_agent, media_list, is_downstream, is_caller_cseq)
def update_media(self, cseq, to_tag, user_agent, media_list, is_downstream, is_caller_cseq):
if self.cseq is None:
old_cseq = (0, 0)
else:
old_cseq = self.cseq
if is_caller_cseq:
cseq = (cseq, old_cseq[1])
if self.to_tag is None and to_tag is not None:
self.to_tag = to_tag
else:
cseq = (old_cseq[0], cseq)
if is_downstream:
party = 'caller'
if self.caller_ua is None:
self.caller_ua = user_agent
else:
party = 'callee'
if self.callee_ua is None:
self.callee_ua = user_agent
if self.cseq is None or cseq > self.cseq:
if not media_list:
return
self.logger.info('got SDP offer')
self.streams[cseq] = new_streams = []
if self.cseq is None:
old_streams = []
else:
old_streams = self.streams[self.cseq]
for media_type, media_ip, media_port, media_direction, media_parameters in media_list:
for old_stream in old_streams:
old_remote = getattr(old_stream, party).remote_sdp
if old_remote is not None:
old_ip, old_port = old_remote
else:
old_ip, old_port = None, None
if old_stream.is_alive and old_stream.media_type == media_type and ((media_ip, media_port) in ((old_ip, old_port), ('0.0.0.0', old_port), (old_ip, 0))):
stream = old_stream
stream.check_hold(party, media_direction, media_ip)
if media_port == 0:
self.logger.info('disabled stream: %s', stream)
else:
self.logger.info('retained stream: %s', stream)
break
else:
stream = MediaStream(self, media_type, media_ip, media_port, media_direction, media_parameters, party)
self.logger.info('proposed stream: %s' % stream)
if media_port == 0:
stream.cleanup()
new_streams.append(stream)
if self.previous_cseq is not None:
for stream in self.streams[self.previous_cseq]:
if stream not in self.streams[self.cseq] + new_streams:
stream.cleanup()
self.previous_cseq = self.cseq
self.cseq = cseq
elif self.cseq == cseq:
self.logger.info('got SDP answer')
now = time()
if self.start_time is None:
self.start_time = now
current_streams = self.streams[cseq]
for stream in current_streams:
if stream.start_time is None:
stream.start_time = now
if to_tag is not None and not media_list:
return
if len(media_list) < len(current_streams):
for stream in current_streams[len(media_list):]:
self.logger.info('removed! stream: %s' % stream)
stream.cleanup('rejected')
for stream, (media_type, media_ip, media_port, media_direction, media_parameters) in zip(current_streams, media_list):
if stream.media_type != media_type:
raise ValueError('Media types do not match: %r and %r' % (stream.media_type, media_type))
if media_port == 0:
if stream.is_alive:
self.logger.info('rejected stream: %s' % stream)
else:
self.logger.info('disabled stream: %s' % stream)
stream.cleanup('rejected')
continue
stream.check_hold(party, media_direction, media_ip)
party_info = getattr(stream, party)
party_info.uses_ice = (media_parameters.get('ice', 'no') == 'yes')
if party_info.remote_sdp is None or party_info.remote_sdp[0] == '0.0.0.0':
party_info.remote_sdp = (media_ip, media_port)
self.logger.info('accepted stream: %s' % stream)
else:
if party_info.remote_sdp[1] != media_port or (party_info.remote_sdp[0] != media_ip != '0.0.0.0'):
stream.reset(party, media_ip, media_port)
self.logger.info('updating stream: %s' % stream)
else:
self.logger.info('retained stream: %s' % stream)
if self.previous_cseq is not None:
for stream in [stream for stream in self.streams[self.previous_cseq] if stream not in current_streams]:
self.logger.info('removing stream: %s' % stream)
stream.cleanup()
else:
self.logger.info('got old CSeq %d:%d, ignoring' % cseq)
def get_local_media(self, is_downstream, cseq, is_caller_cseq):
if is_caller_cseq:
pos = 0
else:
pos = 1
try:
- cseq = max(key for key in self.streams.keys() if key[pos] == cseq)
+ cseq = max(key for key in list(self.streams.keys()) if key[pos] == cseq)
except ValueError:
return None
if is_downstream:
retval = [(stream.status in ['active', 'on hold']) and tuple(stream.rtp.callee.local) or (stream.rtp.callee.local.host, 0) for stream in self.streams[cseq]]
else:
retval = [(stream.status in ['active', 'on hold']) and tuple(stream.rtp.caller.local) or (stream.rtp.caller.local.host, 0) for stream in self.streams[cseq]]
return retval
def cleanup(self):
self.end_time = time()
for cseq in [self.previous_cseq, self.cseq]:
if cseq is not None:
for stream in self.streams[cseq]:
stream.cleanup()
def stream_expired(self, stream):
active_streams = set()
for cseq in [self.previous_cseq, self.cseq]:
if cseq is not None:
active_streams.update({stream for stream in self.streams[cseq] if stream.is_alive})
if len(active_streams) == 0:
self.manager.session_expired(self.call_id, self.from_tag)
@property
def duration(self):
if self.start_time is not None:
if self.end_time is not None:
return int(self.end_time - self.start_time)
else:
return int(time() - self.start_time)
else:
return 0
@property
def relayed_bytes(self):
- return sum(stream.counters.relayed_bytes for stream in set(chain(*self.streams.itervalues())))
+ return sum(stream.counters.relayed_bytes for stream in set(chain(*iter(self.streams.values()))))
@property
def statistics(self):
- all_streams = set(chain(*self.streams.itervalues()))
+ all_streams = set(chain(*iter(self.streams.values())))
attributes = ('call_id', 'from_tag', 'from_uri', 'to_tag', 'to_uri', 'start_time', 'duration')
stats = dict((name, getattr(self, name)) for name in attributes)
stats['caller_ua'] = self.caller_ua or 'Unknown'
stats['callee_ua'] = self.callee_ua or 'Unknown'
stats['streams'] = streams = []
stream_attributes = ('media_type', 'status', 'timeout_wait')
for stream in sorted(all_streams, key=attrgetter('start_time')): # type: MediaStream
info = dict((name, getattr(stream, name)) for name in stream_attributes)
info['caller_codec'] = stream.rtp.caller.codec
info['callee_codec'] = stream.rtp.callee.codec
if stream.start_time is None:
info['start_time'] = info['end_time'] = None
elif self.start_time is None:
info['start_time'] = info['end_time'] = 0
else:
info['start_time'] = max(int(stream.start_time - self.start_time), 0)
if stream.status == 'rejected':
info['end_time'] = info['start_time']
else:
if stream.end_time is None:
info['end_time'] = stats['duration']
else:
info['end_time'] = min(int(stream.end_time - self.start_time), self.duration)
if stream.first_media_time is None:
info['post_dial_delay'] = None
else:
info['post_dial_delay'] = stream.first_media_time - stream.create_time
caller = stream.rtp.caller
callee = stream.rtp.callee
info.update(stream.counters)
info['caller_local'] = str(caller.local)
info['callee_local'] = str(callee.local)
info['caller_remote'] = str(caller.remote)
info['callee_remote'] = str(callee.remote)
streams.append(info)
return stats
class SessionManager(Logger):
- implements(IReadDescriptor)
+ @implementer(IReadDescriptor)
def __init__(self, relay, start_port, end_port):
self.relay = relay
- self.ports = deque((i, i + 1) for i in xrange(start_port, end_port, 2))
+ self.ports = deque((i, i + 1) for i in range(start_port, end_port, 2))
self.bad_ports = deque()
self.sessions = {}
self.watcher = _conntrack.ExpireWatcher()
self.active_byte_counter = 0 # relayed byte counter for sessions active during last speed measurement
self.closed_byte_counter = 0 # relayed byte counter for sessions closed after last speed measurement
self.bps_relayed = 0
if RelayConfig.traffic_sampling_period > 0:
self.speed_calculator = RecurrentCall(RelayConfig.traffic_sampling_period, self._measure_speed)
else:
self.speed_calculator = None
reactor.addReader(self)
def _measure_speed(self):
start_time = time()
- current_byte_counter = sum(session.relayed_bytes for session in self.sessions.itervalues())
+ current_byte_counter = sum(session.relayed_bytes for session in self.sessions.values())
self.bps_relayed = 8 * (current_byte_counter + self.closed_byte_counter - self.active_byte_counter) / RelayConfig.traffic_sampling_period
self.active_byte_counter = current_byte_counter
self.closed_byte_counter = 0
us_taken = int((time() - start_time) * 1000000)
if us_taken > 10000:
log.warning('Aggregate speed calculation time exceeded 10ms: %d us for %d sessions' % (us_taken, len(self.sessions)))
return KeepRunning
# implemented for IReadDescriptor
def fileno(self):
return self.watcher.fd
def doRead(self):
stream = self.watcher.read()
if stream:
stream.expired_func()
def connectionLost(self, reason):
reactor.removeReader(self)
# port management
def get_ports(self):
if len(self.bad_ports) > len(self.ports):
log.debug('Excessive amount of bad ports, doing cleanup')
self.ports.extend(self.bad_ports)
self.bad_ports = deque()
try:
return self.ports.popleft()
except IndexError:
raise RelayPortsExhaustedError()
def set_bad_ports(self, ports):
self.bad_ports.append(ports)
def free_ports(self, ports):
self.ports.append(ports)
# called by higher level
def _find_session_key(self, call_id, from_tag, to_tag):
key_from = (call_id, from_tag)
if key_from in self.sessions:
return key_from
if to_tag:
key_to = (call_id, to_tag)
if key_to in self.sessions:
return key_to
return None
def has_session(self, call_id, from_tag, to_tag=None, **kw):
return any((call_id, tag) in self.sessions for tag in (from_tag, to_tag) if tag is not None)
def update_session(self, dispatcher, call_id, from_tag, from_uri, to_uri, cseq, user_agent, type, media=[], to_tag=None, **kw):
key = self._find_session_key(call_id, from_tag, to_tag)
if key:
session = self.sessions[key]
is_downstream = (session.from_tag != from_tag) ^ (type == 'request')
is_caller_cseq = (session.from_tag == from_tag)
session.update_media(cseq, to_tag, user_agent, media, is_downstream, is_caller_cseq)
elif type == 'reply' and not media:
return None
else:
is_downstream = type == 'request'
is_caller_cseq = True
session = Session(self, dispatcher, call_id, from_tag, from_uri, to_tag, to_uri, cseq, user_agent, media, is_downstream, is_caller_cseq)
self.sessions[(call_id, from_tag)] = session
self.relay.add_session(dispatcher)
return session.get_local_media(is_downstream, cseq, is_caller_cseq)
def remove_session(self, call_id, from_tag, to_tag=None, **kw):
key = self._find_session_key(call_id, from_tag, to_tag)
try:
session = self.sessions[key]
except KeyError:
log.warning('The dispatcher tried to remove a session which is no longer present on the relay')
return None
session.logger.info('removed')
session.cleanup()
self.closed_byte_counter += session.relayed_bytes
del self.sessions[key]
reactor.callLater(0, self.relay.remove_session, session.dispatcher)
return session
def session_expired(self, call_id, from_tag):
key = (call_id, from_tag)
try:
session = self.sessions[key]
except KeyError:
log.warning('A session expired but is no longer present on the relay')
return
session.logger.info('expired')
session.cleanup()
self.closed_byte_counter += session.relayed_bytes
del self.sessions[key]
self.relay.session_expired(session)
self.relay.remove_session(session.dispatcher)
def cleanup(self):
if self.speed_calculator is not None:
self.speed_calculator.cancel()
- for key in self.sessions.keys():
+ for key in list(self.sessions.keys()):
self.session_expired(*key)
@property
def statistics(self):
- return [session.statistics for session in self.sessions.itervalues()]
+ return [session.statistics for session in self.sessions.values()]
@property
def stream_count(self):
stream_count = {}
- for session in self.sessions.itervalues():
- for stream in set(chain(*session.streams.itervalues())):
+ for session in self.sessions.values():
+ for stream in set(chain(*iter(session.streams.values()))):
if stream.is_alive:
stream_count[stream.media_type] = stream_count.get(stream.media_type, 0) + 1
return stream_count
diff --git a/mediaproxy/relay.py b/mediaproxy/relay.py
index 05f7951..b1e1ffa 100644
--- a/mediaproxy/relay.py
+++ b/mediaproxy/relay.py
@@ -1,396 +1,413 @@
"""Implementation of the MediaProxy relay"""
-import cjson
+import json
import signal
import resource
+"""
try:
from twisted.internet import epollreactor; epollreactor.install()
except:
raise RuntimeError('mandatory epoll reactor support is not available from the twisted framework')
+"""
from application import log
from application.process import process
from gnutls.errors import CertificateError, CertificateSecurityError
from gnutls.interfaces.twisted import TLSContext
from time import time
from twisted.protocols.basic import LineOnlyReceiver
+from twisted.protocols.policies import TimeoutMixin
from twisted.internet.error import ConnectionDone, TCPTimedOutError, DNSLookupError
from twisted.internet.protocol import ClientFactory, connectionDone
from twisted.internet.defer import DeferredList, succeed
from twisted.internet import reactor
from twisted.python import failure
from twisted.names import dns
from twisted.names.client import lookupService
from twisted.names.error import DomainError
from mediaproxy import __version__
from mediaproxy.configuration import RelayConfig
from mediaproxy.headers import DecodingDict, DecodingError
from mediaproxy.mediacontrol import SessionManager, RelayPortsExhaustedError
from mediaproxy.scheduler import RecurrentCall, KeepRunning
from mediaproxy.tls import X509Credentials
# Increase the system limit for the maximum number of open file descriptors
# to be able to handle connections to all ports in port_range
+
fd_limit = RelayConfig.port_range.end - RelayConfig.port_range.start + 1000
try:
resource.setrlimit(resource.RLIMIT_NOFILE, (fd_limit, fd_limit))
except ValueError:
raise RuntimeError('Cannot set resource limit for maximum open file descriptors to %d' % fd_limit)
else:
new_limits = resource.getrlimit(resource.RLIMIT_NOFILE)
if new_limits < (fd_limit, fd_limit):
raise RuntimeError("Allocated resource limit for maximum open file descriptors is less then requested (%d instead of %d)" % (new_limits[0], fd_limit))
else:
log.info('Set resource limit for maximum open file descriptors to %d' % fd_limit)
-class RelayClientProtocol(LineOnlyReceiver):
+class RelayClientProtocol(LineOnlyReceiver, TimeoutMixin):
noisy = False
required_headers = {'update': {'call_id', 'from_tag', 'from_uri', 'to_uri', 'cseq', 'user_agent', 'type'},
'remove': {'call_id', 'from_tag'},
'summary': set(),
'sessions': set()}
def __init__(self):
self.command = None
self.seq = None
self.headers = DecodingDict()
self._connection_watcher = None
self._queued_keepalives = 0
def _send_keepalive(self):
if self._queued_keepalives >= 3:
log.error('missed 3 keepalive answers in a row. assuming the connection is down.')
# do not use loseConnection() as it waits to flush the output buffers.
reactor.callLater(0, self.transport.connectionLost, failure.Failure(TCPTimedOutError()))
return None
- self.transport.write('ping' + self.delimiter)
+ self.transport.write(b'ping' + self.delimiter)
self._queued_keepalives += 1
return KeepRunning
def reply(self, reply):
- self.transport.write(reply + self.delimiter)
+ log.debug(f"Send reply: {reply} to {self.transport.getPeer().host}:{self.transport.getPeer().port}")
+ self.transport.write(reply.encode() + self.delimiter)
def connectionMade(self):
peer = self.transport.getPeer()
log.info('Connected to dispatcher at %s:%d' % (peer.host, peer.port))
if RelayConfig.passport is not None:
peer_cert = self.transport.getPeerCertificate()
+ log.debug(f"peer {self.transport.getPeer().host}:{self.transport.getPeer().port} {peer_cert.subject}")
if not RelayConfig.passport.accept(peer_cert):
+ log.debug("Media dispatcher certificate %s refused" % peer_cert.subject)
self.transport.loseConnection()
self._connection_watcher = RecurrentCall(RelayConfig.keepalive_interval, self._send_keepalive)
def connectionLost(self, reason=connectionDone):
if self._connection_watcher is not None:
self._connection_watcher.cancel()
self._connection_watcher = None
self._queued_keepalives = 0
def lineReceived(self, line):
+ line = line.decode()
+ log.debug(f"Line received: {line} from {self.transport.getPeer().host}:{self.transport.getPeer().port}")
if line == 'pong':
self._queued_keepalives -= 1
return
if self.command is None:
try:
command, seq = line.split()
except ValueError:
log.error('Could not decode command/sequence number pair from dispatcher: %s' % line)
return
if command in self.required_headers:
self.command = command
self.seq = seq
self.headers = DecodingDict()
else:
log.error('Unknown command: %s' % command)
self.reply('{} error'.format(seq))
elif line == '':
missing_headers = self.required_headers[self.command].difference(self.headers)
if missing_headers:
for header in missing_headers:
log.error('Missing mandatory header %r from %r command' % (header, self.command))
response = 'error'
else:
# noinspection PyBroadException
try:
response = self.factory.parent.got_command(self.factory.host, self.command, self.headers)
except Exception:
log.exception()
response = 'error'
self.reply('{} {}'.format(self.seq, response))
self.command = None
else:
try:
name, value = line.split(": ", 1)
except ValueError:
log.error('Unable to parse header: %s' % line)
else:
try:
self.headers[name] = value
- except DecodingError, e:
+ except DecodingError as e:
log.error('Could not decode header: %s' % e)
class DispatcherConnectingFactory(ClientFactory):
noisy = False
protocol = RelayClientProtocol
def __init__(self, parent, host, port):
self.parent = parent
self.host = (host, port)
self.delayed = None
self.connection_lost = False
def __eq__(self, other):
return self.host == other.host
def clientConnectionFailed(self, connector, reason):
log.error('Could not connect to dispatcher at %(host)s:%(port)d (retrying in %%d seconds): %%s' % connector.__dict__ % (RelayConfig.reconnect_delay, reason.value))
if self.parent.connector_needs_reconnect(connector):
self.delayed = reactor.callLater(RelayConfig.reconnect_delay, connector.connect)
def clientConnectionLost(self, connector, reason):
self.cancel_delayed()
- if reason.type != ConnectionDone:
- log.error('Connection with dispatcher at %(host)s:%(port)d was lost: %%s' % connector.__dict__ % reason.value)
- else:
- log.info('Connection with dispatcher at %(host)s:%(port)d was closed' % connector.__dict__)
+# if reason.type != ConnectionDone:
+ log.error('Connection with dispatcher at %(host)s:%(port)d was lost: %%s' % connector.__dict__ % reason.value)
+# else:
+# log.info('Connection with dispatcher at %(host)s:%(port)d was closed' % connector.__dict__)
if self.parent.connector_needs_reconnect(connector):
if isinstance(reason.value, CertificateError) or self.connection_lost:
self.delayed = reactor.callLater(RelayConfig.reconnect_delay, connector.connect)
else:
self.delayed = reactor.callLater(min(RelayConfig.reconnect_delay, 1), connector.connect)
self.connection_lost = True
def buildProtocol(self, addr):
self.delayed = reactor.callLater(5, self._connected_successfully)
return ClientFactory.buildProtocol(self, addr)
def _connected_successfully(self):
+ log.debug('Connected successfully')
self.connection_lost = False
def cancel_delayed(self):
if self.delayed:
if self.delayed.active():
self.delayed.cancel()
self.delayed = None
class SRVMediaRelayBase(object):
def __init__(self):
self.shutting_down = False
self.srv_monitor = RecurrentCall(RelayConfig.dns_check_interval, self._do_lookup)
self._do_lookup()
def _do_lookup(self):
defers = []
for addr, port, is_domain in RelayConfig.dispatchers:
if is_domain:
defer = lookupService("_sip._udp.%s" % addr)
defer.addCallback(self._cb_got_srv, port)
defer.addErrback(self._eb_no_srv, addr, port)
defers.append(defer)
else:
defers.append(succeed((addr, port)))
defer = DeferredList(defers)
defer.addCallback(self._cb_got_all)
return KeepRunning
- def _cb_got_srv(self, (answers, auth, add), port):
+ def _cb_got_srv(self, answ_auth_add, port):
+ (answers, auth, add) = answ_auth_add
for answer in answers:
if answer.type == dns.SRV and answer.payload and answer.payload.target != dns.Name("."):
return str(answer.payload.target), port
raise DomainError
def _eb_no_srv(self, failure, addr, port):
failure.trap(DomainError)
return reactor.resolve(addr).addCallback(lambda host: (host, port)).addErrback(self._eb_no_dns, addr)
def _eb_no_dns(self, failure, addr):
failure.trap(DNSLookupError)
log.error("Could resolve neither SRV nor A record for '%s'" % addr)
def _cb_got_all(self, results):
if not self.shutting_down:
dispatchers = [result[1] for result in results if result[0] and result[1] is not None]
self.update_dispatchers(dispatchers)
def update_dispatchers(self, dispatchers):
raise NotImplementedError()
def run(self):
process.signals.add_handler(signal.SIGHUP, self._handle_signal)
process.signals.add_handler(signal.SIGINT, self._handle_signal)
process.signals.add_handler(signal.SIGTERM, self._handle_signal)
process.signals.add_handler(signal.SIGUSR1, self._handle_signal)
reactor.run(installSignalHandlers=False)
def stop(self, graceful=False):
reactor.callFromThread(self._shutdown, graceful=graceful)
def _handle_signal(self, signum, frame):
if signum == signal.SIGUSR1:
# toggle debugging
if log.level.current != log.level.DEBUG:
log.level.current = log.level.DEBUG
log.info('Switched logging level to DEBUG')
else:
log.info('Switched logging level to {}'.format(RelayConfig.log_level))
log.level.current = RelayConfig.log_level
else:
# terminate program
signal_map = {signal.SIGTERM: 'Terminated', signal.SIGINT: 'Interrupted', signal.SIGHUP: 'Graceful shutdown'}
log.info(signal_map.get(signum, 'Received signal {}, exiting.'.format(signum)))
self.stop(graceful=(signum == signal.SIGHUP))
def _shutdown(self, graceful=False):
raise NotImplementedError()
@staticmethod
def _shutdown_done():
reactor.stop()
try:
from mediaproxy.sipthor import SIPThorMediaRelayBase as MediaRelayBase
except ImportError:
MediaRelayBase = SRVMediaRelayBase
class MediaRelay(MediaRelayBase):
def __init__(self):
self.cred = X509Credentials(cert_name='relay')
self.tls_context = TLSContext(self.cred)
self.session_manager = SessionManager(self, RelayConfig.port_range.start, RelayConfig.port_range.end)
self.dispatchers = set()
self.dispatcher_session_count = {}
self.dispatcher_connectors = {}
self.old_connectors = {}
self.shutting_down = False
self.graceful_shutdown = False
self.start_time = time()
- super(MediaRelay, self).__init__()
+ super().__init__()
@property
def status(self):
if self.graceful_shutdown or self.shutting_down:
return 'halting'
else:
return 'active'
def update_dispatchers(self, dispatchers):
dispatchers = set(dispatchers)
for new_dispatcher in dispatchers.difference(self.dispatchers):
- if new_dispatcher in self.old_connectors.iterkeys():
+ if new_dispatcher in iter(self.old_connectors.keys()):
log.info('Restoring old dispatcher at %s:%d' % new_dispatcher)
self.dispatcher_connectors[new_dispatcher] = self.old_connectors.pop(new_dispatcher)
else:
log.info('Adding new dispatcher at %s:%d' % new_dispatcher)
dispatcher_addr, dispatcher_port = new_dispatcher
factory = DispatcherConnectingFactory(self, dispatcher_addr, dispatcher_port)
- self.dispatcher_connectors[new_dispatcher] = reactor.connectTLS(dispatcher_addr, dispatcher_port, factory, self.tls_context)
+ self.dispatcher_connectors[new_dispatcher] = reactor.connectTLS(dispatcher_addr,
+ dispatcher_port,
+ factory,
+ self.tls_context)
for old_dispatcher in self.dispatchers.difference(dispatchers):
try:
self.old_connectors[old_dispatcher] = self.dispatcher_connectors.pop(old_dispatcher)
except KeyError:
pass
else:
log.info('Removing old dispatcher at %s:%d' % old_dispatcher)
self._check_disconnect(old_dispatcher)
self.dispatchers = dispatchers
def got_command(self, dispatcher, command, headers):
if command == 'summary':
summary = {'ip': RelayConfig.relay_ip,
'version': __version__,
'status': self.status,
'uptime': int(time() - self.start_time),
'session_count': len(self.session_manager.sessions),
'stream_count': self.session_manager.stream_count,
'bps_relayed': self.session_manager.bps_relayed}
- return cjson.encode(summary)
+ return json.dumps(summary)
elif command == 'sessions':
- return cjson.encode(self.session_manager.statistics)
+ return json.dumps(self.session_manager.statistics)
elif command == 'update':
if self.graceful_shutdown or self.shutting_down:
if not self.session_manager.has_session(**headers):
log.info('cannot add new session: media-relay is shutting down')
return 'halting'
try:
local_media = self.session_manager.update_session(dispatcher, **headers)
except RelayPortsExhaustedError:
log.error('Could not reserve relay ports for session, all allocated ports are being used')
return 'error'
if local_media:
return ' '.join([RelayConfig.advertised_ip or local_media[0][0]] + [str(media[1]) for media in local_media])
else: # command == 'remove'
session = self.session_manager.remove_session(**headers)
if session is None:
return 'error'
else:
- return cjson.encode(session.statistics)
+ return json.dumps(session.statistics)
def session_expired(self, session):
connector = self.dispatcher_connectors.get(session.dispatcher)
if connector is None:
connector = self.old_connectors.get(session.dispatcher)
if connector and connector.state == 'connected':
- connector.transport.write(' '.join(['expired', cjson.encode(session.statistics)]) + connector.factory.protocol.delimiter)
+# connector.transport.write(' '.join(['expired', json.dumps(session.statistics)]) + connector.factory.protocol.delimiter)
+ to_write = [elem.encode() for elem in ['expired', json.dumps(session.statistics)]]
+ connector.transport.write(connector.factory.protocol.delimiter.join(to_write) + connector.factory.protocol.delimiter)
+
else:
log.warning('dispatcher for expired session is no longer online, statistics are lost!')
def add_session(self, dispatcher):
self.dispatcher_session_count[dispatcher] = self.dispatcher_session_count.get(dispatcher, 0) + 1
def remove_session(self, dispatcher):
self.dispatcher_session_count[dispatcher] -= 1
if self.dispatcher_session_count[dispatcher] == 0:
del self.dispatcher_session_count[dispatcher]
if self.graceful_shutdown and not self.dispatcher_session_count:
self._shutdown()
elif dispatcher in self.old_connectors:
self._check_disconnect(dispatcher)
def _check_disconnect(self, dispatcher):
connector = self.old_connectors[dispatcher]
if self.dispatcher_session_count.get(dispatcher, 0) == 0:
old_state = connector.state
connector.factory.cancel_delayed()
connector.disconnect()
if old_state == "disconnected":
del self.old_connectors[dispatcher]
if self.shutting_down and len(self.dispatcher_connectors) + len(self.old_connectors) == 0:
self._shutdown_done()
def connector_needs_reconnect(self, connector):
- if connector in self.dispatcher_connectors.values():
+ if connector in list(self.dispatcher_connectors.values()):
return True
else:
- for dispatcher, old_connector in self.old_connectors.items():
+ for dispatcher, old_connector in list(self.old_connectors.items()):
if old_connector is connector:
if self.dispatcher_session_count.get(dispatcher, 0) > 0:
return True
else:
del self.old_connectors[dispatcher]
break
if self.shutting_down:
if len(self.old_connectors) == 0:
self._shutdown_done()
return False
def _shutdown(self, graceful=False):
if graceful:
self.graceful_shutdown = True
if self.dispatcher_session_count:
return
if not self.shutting_down:
self.shutting_down = True
self.srv_monitor.cancel()
self.session_manager.cleanup()
if len(self.dispatcher_connectors) + len(self.old_connectors) == 0:
self._shutdown_done()
else:
self.update_dispatchers([])
diff --git a/mediaproxy/scheduler.py b/mediaproxy/scheduler.py
index dfa4a3d..023716a 100644
--- a/mediaproxy/scheduler.py
+++ b/mediaproxy/scheduler.py
@@ -1,46 +1,46 @@
"""Schedule calls on the twisted reactor"""
__all__ = ['RecurrentCall', 'KeepRunning']
from time import time
class KeepRunning:
"""Return this class from a recurrent function to indicate that it should keep running"""
pass
class RecurrentCall(object):
"""Execute a function repeatedly at the given interval, until signaled to stop"""
def __init__(self, period, func, *args, **kwargs):
from twisted.internet import reactor
self.func = func
self.args = args
self.kwargs = kwargs
self.period = period
self.now = None
self.next = None
self.callid = reactor.callLater(period, self)
def __call__(self):
from twisted.internet import reactor
self.callid = None
if self.now is None:
self.now = time()
self.next = self.now + self.period
else:
self.now, self.next = self.next, self.next + self.period
result = self.func(*self.args, **self.kwargs)
if result is KeepRunning:
- delay = max(self.next-time(), 0)
+ delay = max(self.next - time(), 0)
self.callid = reactor.callLater(delay, self)
def cancel(self):
if self.callid is not None:
try:
self.callid.cancel()
except ValueError:
pass
self.callid = None
diff --git a/mediaproxy/tls.py b/mediaproxy/tls.py
index f0e7518..b3ba24b 100644
--- a/mediaproxy/tls.py
+++ b/mediaproxy/tls.py
@@ -1,89 +1,91 @@
"""TLS support"""
__all__ = ['X509Credentials']
import os
import stat
from application.process import process
from gnutls import crypto
from gnutls.interfaces import twisted
from mediaproxy.configuration import TLSConfig
class FileDescriptor(object):
def __init__(self, name, type):
certs_path = os.path.normpath(TLSConfig.certs_path)
+# print(f"Tls config from {certs_path}")
self.path = os.path.join(certs_path, name)
self.klass = type
self.timestamp = 0
self.object = None
def get(self):
path = process.configuration.file(self.path)
if path is None:
raise RuntimeError('missing or unreadable file: %s' % self.path)
mtime = os.stat(path)[stat.ST_MTIME]
if self.timestamp < mtime:
f = open(path)
try:
self.object = self.klass(f.read())
self.timestamp = mtime
finally:
f.close()
return self.object
class X509Entity(object):
type = None
def __init__(self, name_attr):
self.name_attr = name_attr
self.descriptors = {}
def __get__(self, obj, type_=None):
name = getattr(obj or type_, self.name_attr, None)
if name is None:
return None
descriptor = self.descriptors.setdefault(name, FileDescriptor(name, self.type))
return descriptor.get()
def __set__(self, obj, value):
raise AttributeError('cannot set attribute')
def __delete__(self, obj):
raise AttributeError('cannot delete attribute')
class X509Certificate(X509Entity):
type = crypto.X509Certificate
class X509PrivateKey(X509Entity):
type = crypto.X509PrivateKey
class X509CRL(X509Entity):
type = crypto.X509CRL
class X509Credentials(twisted.X509Credentials):
"""SIPThor X509 credentials"""
X509cert_name = None # will be defined by each instance
X509key_name = None # will be defined by each instance
X509ca_name = 'ca.pem'
X509crl_name = 'crl.pem'
X509cert = X509Certificate(name_attr='X509cert_name')
X509key = X509PrivateKey(name_attr='X509key_name')
X509ca = X509Certificate(name_attr='X509ca_name')
X509crl = X509CRL(name_attr='X509crl_name')
def __init__(self, cert_name):
self.X509cert_name = '%s.crt' % cert_name
self.X509key_name = '%s.key' % cert_name
+# print(f"cert file called {cert_name}")
twisted.X509Credentials.__init__(self, self.X509cert, self.X509key, [self.X509ca], [self.X509crl])
self.verify_peer = True
self.verify_period = TLSConfig.verify_interval
diff --git a/setup.py b/setup.py
index e2a8d2f..e6bd61d 100755
--- a/setup.py
+++ b/setup.py
@@ -1,54 +1,54 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
import re
import sys
import mediaproxy
from distutils.core import setup, Extension
# Get the title and description from README
readme = open('README').read()
title, description = re.findall(r'^\s*([^\n]+)\s+(.*)$', readme, re.DOTALL)[0]
# media-relay is not supported on non-linux platforms
#
-if sys.platform == 'linux2':
+if 'linux' in sys.platform:
scripts = ['media-relay', 'media-dispatcher']
ext_modules = [Extension(name='mediaproxy.interfaces.system._conntrack',
sources=['mediaproxy/interfaces/system/_conntrack.c'],
libraries=['netfilter_conntrack', 'ip4tc'],
define_macros=[('MODULE_VERSION', mediaproxy.__version__)])]
else:
print('WARNING: skipping the media relay component as this is a non-linux platform')
scripts = ['media-dispatcher']
ext_modules = []
setup(
name='mediaproxy',
version=mediaproxy.__version__,
description=title,
long_description=description,
url='http://www.ag-projects.com/MediaProxy.html',
author='AG Projects',
author_email='support@ag-projects.com',
license='GPLv2',
platforms=['Linux'],
classifiers=[
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Service Providers',
'License :: GNU General Public License (GPLv2)',
'Operating System :: POSIX :: Linux',
'Programming Language :: Python',
'Programming Language :: C'
],
packages=['mediaproxy', 'mediaproxy.configuration', 'mediaproxy.interfaces', 'mediaproxy.interfaces.accounting', 'mediaproxy.interfaces.system'],
data_files=[('/etc/mediaproxy', ['config.ini.sample']), ('/etc/mediaproxy/radius', ['radius/dictionary']), ('/etc/mediaproxy/tls', ['tls/README'])],
scripts=scripts,
ext_modules=ext_modules
)
diff --git a/test/common.py b/test/common.py
index 928914f..f5f6317 100644
--- a/test/common.py
+++ b/test/common.py
@@ -1,215 +1,217 @@
# Copyright (C) 2008 AG Projects
#
import sys; sys.path.extend(['.', '..'])
import os
import random
import string
import struct
import mediaproxy
from application.configuration import *
from application.process import process
from application.system import host
from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, succeed
from twisted.internet.protocol import DatagramProtocol, ClientFactory
from twisted.internet.task import LoopingCall
from twisted.protocols.basic import LineOnlyReceiver
from mediaproxy.headers import EncodingDict
process.configuration.user_directory = None
process.configuration.subdirectory = mediaproxy.mediaproxy_subdirectory
class Config(ConfigSection):
__cfgfile__ = mediaproxy.configuration_file
__section__ = 'Dispatcher'
socket = '/run/mediaproxy/dispatcher.sock'
random_data = os.urandom(512)
stun_data = struct.pack('!HHIIII', 0x0001, 0, 0x2112A442, 0, 0, 0)
default_host_ip = host.default_ip
class OpenSIPSControlClientProtocol(LineOnlyReceiver):
def __init__(self):
self.defer = None
def lineReceived(self, line):
+ line = line.decode()
if line == 'error':
print('got error from dispatcher!')
reactor.stop()
elif self.defer is not None:
- print('got ip/ports from dispatcher: %s' % line)
+ print(('got ip/ports from dispatcher: %s' % line))
ip, ports = line.split(' ', 1)
defer = self.defer
self.defer = None
defer.callback((ip, [int(i) for i in ports.split()]))
else:
- print('got reply from dispatcher: %s' % line)
+ print(('got reply from dispatcher: %s' % line))
defer = self.defer
self.defer = None
defer.callback(line)
def _send_command(self, command, headers):
self.defer = Deferred()
- data = self.delimiter.join([command] + ['%s: %s' % item for item in headers.iteritems()]) + 2*self.delimiter
- # print('writing on socket:\n%s' % data)
- self.transport.write(data)
+ data = self.delimiter.decode().join([command] + ['%s: %s' % item for item in headers.items()]) + 2 * self.delimiter.decode()
+ print('writing on socket:\n%s' % data)
+ self.transport.write(data.encode())
return self.defer
def update(self, **kw_args):
return self._send_command('update', EncodingDict(kw_args))
def remove(self, **kw_args):
return self._send_command('remove', EncodingDict(kw_args))
class OpenSIPSConnectorFactory(ClientFactory):
protocol = OpenSIPSControlClientProtocol
def __init__(self):
self.defer = Deferred()
def buildProtocol(self, addr):
prot = ClientFactory.buildProtocol(self, addr)
reactor.callLater(0, self.defer.callback, prot)
return prot
class MediaReceiverProtocol(DatagramProtocol):
def __init__(self, endpoint, index):
self.endpoint = endpoint
self.index = index
self.loop = None
self.received_media = False
self.defer = Deferred()
- def datagramReceived(self, data, (host, port)):
+ def datagramReceived(self, data, addr):
+ (host, port) = addr
if not self.received_media:
self.received_media = True
- print('received media %d for %s from %s:%d' % (self.index, self.endpoint.name, host, port))
+ print(('received media %d for %s from %s:%d' % (self.index, self.endpoint.name, host, port)))
self.defer.callback(None)
def connectionRefused(self):
- print('connection refused for media %d for %s' % (self.index, self.endpoint.name))
+ print(('connection refused for media %d for %s' % (self.index, self.endpoint.name)))
class Endpoint(object):
def __init__(self, sip_uri, user_agent, is_caller):
if is_caller:
self.name = 'caller'
else:
self.name = 'callee'
self.sip_uri = sip_uri
self.user_agent = user_agent
self.tag = ''.join(random.sample(string.ascii_lowercase, 8))
self.connectors = []
self.media = []
self.cseq = 1
def set_media(self, media):
assert(len(self.connectors) == 0)
self.media = media
for index, (media_type, port, direction, parameters) in enumerate(self.media):
if port != 0:
protocol = MediaReceiverProtocol(self, index)
connector = reactor.listenUDP(port, protocol)
else:
connector = None
self.connectors.append(connector)
return DeferredList([connector.protocol.defer for connector in self.connectors if connector is not None])
def get_media(self, use_old_hold):
if use_old_hold:
ip = '0.0.0.0'
else:
ip = default_host_ip
return [(media_type, ip, port, direction, parameters) for media_type, port, direction, parameters in self.media]
def start_media(self, ip, ports, send_stun=False):
for port, connector in zip(ports, self.connectors):
if connector is not None:
protocol = connector.protocol
if port != 0:
protocol.transport.connect(ip, port)
protocol.loop = LoopingCall(protocol.transport.write, send_stun and stun_data or random_data)
protocol.loop.start(random.uniform(0.5, 1))
else:
protocol.defer.callback(None)
def stop_media(self):
defers = []
for connector in self.connectors:
if connector is not None:
if connector.protocol.loop is not None:
connector.protocol.loop.stop()
connector.protocol.loop = None
defer = connector.stopListening()
if defer is not None:
defers.append(defer)
self.connectors = []
if defers:
return DeferredList(defers)
else:
return succeed(None)
class Session(object):
def __init__(self, caller, callee):
self.caller = caller
self.callee = callee
self.call_id = ''.join(random.sample(string.ascii_letters, 24))
def _get_parties(self, party):
party = getattr(self, party)
if party is self.caller:
other = self.callee
else:
other = self.caller
return party, other
def do_update(self, opensips, party, type, is_final, use_old_hold=False):
party, other = self._get_parties(party)
if type == 'request':
from_tag = party.tag
to_tag = other.tag
from_uri = party.sip_uri
to_uri = other.sip_uri
cseq = party.cseq
else:
from_tag = other.tag
to_tag = party.tag
from_uri = other.sip_uri
to_uri = party.sip_uri
cseq = other.cseq
if is_final:
defer = opensips.update(call_id=self.call_id, from_tag=from_tag, to_tag=to_tag, from_uri=from_uri, to_uri=to_uri, cseq=cseq, user_agent=party.user_agent, media=party.get_media(use_old_hold), type=type, dialog_id='1234567890')
else:
defer = opensips.update(call_id=self.call_id, from_tag=from_tag, to_tag=to_tag, from_uri=from_uri, to_uri=to_uri, cseq=cseq, user_agent=party.user_agent, media=party.get_media(use_old_hold), type=type, dialog_id='1234567890')
if is_final:
if type == 'request':
party.cseq += 1
else:
other.cseq += 1
return defer
def do_remove(self, opensips, party):
party, other = self._get_parties(party)
opensips.remove(call_id=self.call_id, from_tag=party.tag, to_tag=other.tag)
def connect_to_dispatcher():
factory = OpenSIPSConnectorFactory()
connector = reactor.connectUNIX(Config.socket, factory)
return connector, factory.defer
diff --git a/test/holdtest1.py b/test/holdtest1.py
index ccdcc24..4278f5b 100755
--- a/test/holdtest1.py
+++ b/test/holdtest1.py
@@ -1,126 +1,128 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
# Copyright (C) 2008 AG Projects
#
"""
This test simulates a session that starts with 1 audio stream, then gets put
on hold by the caller for 5 minutes, the gets taken out of hold again. This
test uses the newer 'sendonly' direction attribute to indicate hold status.
"""
from common import *
def phase1(protocol, session):
print('setting up audio stream')
caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {})])
callee_media = session.callee.set_media([('audio', 30000, 'sendrecv', {})])
media_defer = DeferredList([caller_media, callee_media])
defer = succeed(None)
defer.addCallback(caller_update, protocol, session, media_defer, phase2)
return defer
def phase2(result, protocol, session):
print('setting stream on hold')
session.caller.set_media([('audio', 40000, 'sendonly', {})])
session.callee.set_media([('audio', 30000, 'recvonly', {})])
defer = session.do_update(protocol, 'caller', 'request', False)
defer.addCallback(callee_update_hold, protocol, session)
return defer
def callee_update_hold(result, protocol, session):
print('updating hold for callee')
defer = session.do_update(protocol, 'callee', 'reply', True)
defer.addCallback(wait_hold, protocol, session)
return defer
def wait_hold(result, protocol, session):
print('on hold, waiting 5 minutes...')
defer = Deferred()
defer.addCallback(stop_media_hold, protocol, session)
reactor.callLater(300, defer.callback, None)
return defer
def stop_media_hold(result, protocol, session):
print('stopping media for hold')
defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()])
defer.addCallback(phase3, protocol, session)
return defer
def phase3(result, protocol, session):
print('continuing audio stream')
caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {})])
callee_media = session.callee.set_media([('audio', 30000, 'sendrecv', {})])
media_defer = DeferredList([caller_media, callee_media])
defer = succeed(None)
defer.addCallback(caller_update, protocol, session, media_defer, kthxbye)
return defer
def caller_update(result, protocol, session, media_defer, do_after):
print('doing update for caller')
defer = session.do_update(protocol, 'caller', 'request', False)
defer.addCallback(callee_update, protocol, session, media_defer, do_after)
return defer
def callee_update(callee_addr, protocol, session, media_defer, do_after):
print('doing update for callee')
defer = session.do_update(protocol, 'callee', 'reply', True)
defer.addCallback(do_media, callee_addr, protocol, session, media_defer, do_after)
return defer
-def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, media_defer, do_after):
+def do_media(caller_addr, callee_addr, protocol, session, media_defer, do_after):
+ (caller_ip, caller_ports) = caller_addr
+ (callee_ip, callee_ports) = callee_addr
print('starting media for both parties')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
media_defer.addCallback(wait, protocol, session, do_after)
return media_defer
def wait(result, protocol, session, do_after):
print('got media, waiting 5 seconds')
defer = Deferred()
defer.addCallback(stop_media, protocol, session, do_after)
reactor.callLater(5, defer.callback, None)
return defer
def stop_media(result, protocol, session, do_after):
print('stopping media')
defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()])
defer.addCallback(do_after, protocol, session)
return defer
def kthxbye(result, protocol, session):
print('sending remove')
return session.do_remove(protocol, 'caller')
def disconnect(result, connector):
print('disconnecting')
connector.disconnect()
reactor.callLater(1, reactor.stop)
def catch_all_err(failure):
print(failure)
if __name__ == '__main__':
caller = Endpoint('Alice ', 'Caller UA', True)
callee = Endpoint('Bob ', 'Callee UA', False)
session = Session(caller, callee)
connector, defer = connect_to_dispatcher()
defer.addCallback(phase1, session)
defer.addCallback(disconnect, connector)
defer.addErrback(catch_all_err)
reactor.run()
diff --git a/test/holdtest2.py b/test/holdtest2.py
index 6f27731..0a2d2ea 100755
--- a/test/holdtest2.py
+++ b/test/holdtest2.py
@@ -1,126 +1,128 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
# Copyright (C) 2008 AG Projects
#
"""
This test simulates a session that starts with 1 audio stream, then gets put
on hold by the caller for 5 minutes, the gets taken out of hold again. This
test uses the older 0.0.0.0 IP address to indicate hold status.
"""
from common import *
def phase1(protocol, session):
print('setting up audio stream')
caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {})])
callee_media = session.callee.set_media([('audio', 30000, 'sendrecv', {})])
media_defer = DeferredList([caller_media, callee_media])
defer = succeed(None)
defer.addCallback(caller_update, protocol, session, media_defer, phase2)
return defer
def phase2(result, protocol, session):
print('setting stream on hold')
session.caller.set_media([('audio', 40000, 'sendrecv', {})])
session.callee.set_media([('audio', 30000, 'sendrecv', {})])
defer = session.do_update(protocol, 'caller', 'request', False, True)
defer.addCallback(callee_update_hold, protocol, session)
return defer
def callee_update_hold(result, protocol, session):
print('updating hold for callee')
defer = session.do_update(protocol, 'callee', 'reply', True, True)
defer.addCallback(wait_hold, protocol, session)
return defer
def wait_hold(result, protocol, session):
print('on hold, waiting 5 minutes...')
defer = Deferred()
defer.addCallback(stop_media_hold, protocol, session)
reactor.callLater(300, defer.callback, None)
return defer
def stop_media_hold(result, protocol, session):
print('stopping media for hold')
defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()])
defer.addCallback(phase3, protocol, session)
return defer
def phase3(result, protocol, session):
print('continuing audio stream')
caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {})])
callee_media = session.callee.set_media([('audio', 30000, 'sendrecv', {})])
media_defer = DeferredList([caller_media, callee_media])
defer = succeed(None)
defer.addCallback(caller_update, protocol, session, media_defer, kthxbye)
return defer
def caller_update(result, protocol, session, media_defer, do_after):
print('doing update for caller')
defer = session.do_update(protocol, 'caller', 'request', False)
defer.addCallback(callee_update, protocol, session, media_defer, do_after)
return defer
def callee_update(callee_addr, protocol, session, media_defer, do_after):
print('doing update for callee')
defer = session.do_update(protocol, 'callee', 'reply', True)
defer.addCallback(do_media, callee_addr, protocol, session, media_defer, do_after)
return defer
-def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, media_defer, do_after):
+def do_media(caller_addr, callee_addr, protocol, session, media_defer, do_after):
+ (caller_ip, caller_ports) = caller_addr
+ (callee_ip, callee_ports) = callee_addr
print('starting media for both parties')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
media_defer.addCallback(wait, protocol, session, do_after)
return media_defer
def wait(result, protocol, session, do_after):
print('got media, waiting 5 seconds')
defer = Deferred()
defer.addCallback(stop_media, protocol, session, do_after)
reactor.callLater(5, defer.callback, None)
return defer
def stop_media(result, protocol, session, do_after):
print('stopping media')
defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()])
defer.addCallback(do_after, protocol, session)
return defer
def kthxbye(result, protocol, session):
print('sending remove')
return session.do_remove(protocol, 'caller')
def disconnect(result, connector):
print('disconnecting')
connector.disconnect()
reactor.callLater(1, reactor.stop)
def catch_all_err(failure):
print(failure)
if __name__ == '__main__':
caller = Endpoint('Alice ', 'Caller UA', True)
callee = Endpoint('Bob ', 'Callee UA', False)
session = Session(caller, callee)
connector, defer = connect_to_dispatcher()
defer.addCallback(phase1, session)
defer.addCallback(disconnect, connector)
defer.addErrback(catch_all_err)
reactor.run()
diff --git a/test/holdtest3.py b/test/holdtest3.py
index a0734fb..517b01a 100755
--- a/test/holdtest3.py
+++ b/test/holdtest3.py
@@ -1,103 +1,105 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
# Copyright (C) 2008 AG Projects
#
"""
This test simulates a session that starts with 1 audio stream, then gets put
on hold by the caller and stops without a BYE after 10 seconds. It is meant to
test the on hold timeout.
"""
from common import *
def phase1(protocol, session):
print('setting up audio stream')
caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {})])
callee_media = session.callee.set_media([('audio', 30000, 'sendrecv', {})])
media_defer = DeferredList([caller_media, callee_media])
defer = succeed(None)
defer.addCallback(caller_update, protocol, session, media_defer, phase2)
return defer
def phase2(result, protocol, session):
print('setting stream on hold')
session.caller.set_media([('audio', 40000, 'sendonly', {})])
session.callee.set_media([('audio', 30000, 'recvonly', {})])
defer = session.do_update(protocol, 'caller', 'request', False)
defer.addCallback(callee_update_hold, protocol, session)
return defer
def callee_update_hold(result, protocol, session):
print('updating hold for callee')
defer = session.do_update(protocol, 'callee', 'reply', True)
defer.addCallback(wait_hold, protocol, session)
return defer
def wait_hold(result, protocol, session):
print('on hold, waiting 10 seconds...')
defer = Deferred()
reactor.callLater(10, defer.callback, None)
return defer
def caller_update(result, protocol, session, media_defer, do_after):
print('doing update for caller')
defer = session.do_update(protocol, 'caller', 'request', False)
defer.addCallback(callee_update, protocol, session, media_defer, do_after)
return defer
def callee_update(callee_addr, protocol, session, media_defer, do_after):
print('doing update for callee')
defer = session.do_update(protocol, 'callee', 'reply', True)
defer.addCallback(do_media, callee_addr, protocol, session, media_defer, do_after)
return defer
-def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, media_defer, do_after):
+def do_media(caller_addr, callee_addr, protocol, session, media_defer, do_after):
+ (caller_ip, caller_ports) = caller_addr
+ (callee_ip, callee_ports) = callee_addr
print('starting media for both parties')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
media_defer.addCallback(wait, protocol, session, do_after)
return media_defer
def wait(result, protocol, session, do_after):
print('got media, waiting 5 seconds')
defer = Deferred()
defer.addCallback(stop_media, protocol, session, do_after)
reactor.callLater(5, defer.callback, None)
return defer
def stop_media(result, protocol, session, do_after):
print('stopping media')
defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()])
defer.addCallback(do_after, protocol, session)
return defer
def disconnect(result, connector):
print('disconnecting')
connector.disconnect()
reactor.callLater(1, reactor.stop)
def catch_all_err(failure):
print(failure)
if __name__ == '__main__':
caller = Endpoint('Alice ', 'Caller UA', True)
callee = Endpoint('Bob ', 'Callee UA', False)
session = Session(caller, callee)
connector, defer = connect_to_dispatcher()
defer.addCallback(phase1, session)
defer.addCallback(disconnect, connector)
defer.addErrback(catch_all_err)
reactor.run()
diff --git a/test/icetest1.py b/test/icetest1.py
index 3815444..9e4193c 100755
--- a/test/icetest1.py
+++ b/test/icetest1.py
@@ -1,96 +1,98 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
# Copyright (C) 2009 AG Projects
#
"""
This test simulates a call flow with ICE where the relay is NOT selected as a candidate:
- The caller sends an INVITE
- the callee sends a 200 OK
- Both parties will send probing STUN requests for a few seconds
- Both parties will stop the probes and not send media through the relay
- After 4 minutes, the callee will send a BYE
"""
from common import *
def caller_update(protocol, session, caller_media, callee_media):
print('doing update for caller')
defer = session.do_update(protocol, 'caller', 'request', False)
defer.addCallback(callee_update, protocol, session, caller_media, callee_media)
return defer
def callee_update(callee_addr, protocol, session, caller_media, callee_media):
print('doing update for callee')
defer = session.do_update(protocol, 'callee', 'reply', True)
defer.addCallback(do_stun, callee_addr, protocol, session, caller_media, callee_media)
return defer
-def do_stun((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media):
+def do_stun(caller_addr, callee_addr, protocol, session, caller_media, callee_media):
+ (caller_ip, caller_ports) = caller_addr
+ (callee_ip, callee_ports) = callee_addr
print('starting STUN probes for both parties')
session.caller.start_media(caller_ip, caller_ports, send_stun=True)
session.callee.start_media(callee_ip, callee_ports, send_stun=True)
defer = DeferredList([caller_media, callee_media])
defer.addCallback(wait_stun, session, protocol)
return defer
def wait_stun(result, session, protocol):
print('got STUN probes, waiting 3 seconds')
defer = Deferred()
defer.addCallback(stop_stun_caller, session, protocol)
reactor.callLater(3, defer.callback, None)
return defer
def stop_stun_caller(result, session, protocol):
print('stopping STUN probes for caller')
defer = session.caller.stop_media()
defer.addCallback(stop_stun_callee, session, protocol)
return defer
def stop_stun_callee(result, session, protocol):
print('stopping STUN probes for callee')
defer = session.callee.stop_media()
defer.addCallback(wait_end, session, protocol)
return defer
def wait_end(result, session, protocol):
print('media is flowing via a different path than the relay for 4 minutes')
defer = Deferred()
defer.addCallback(end, session, protocol)
reactor.callLater(240, defer.callback, None)
return defer
def end(result, session, protocol):
print('sending remove')
return session.do_remove(protocol, 'callee')
def disconnect(result, connector):
print('disconnecting')
connector.disconnect()
reactor.callLater(1, reactor.stop)
def catch_all_err(failure):
print(failure)
if __name__ == '__main__':
caller = Endpoint('Alice ', 'Caller UA', True)
caller_media = caller.set_media([('audio', 40000, 'sendrecv', {'ice': 'yes'})])
callee = Endpoint('Bob ', 'Callee UA', False)
callee_media = callee.set_media([('audio', 30000, 'sendrecv', {'ice': 'yes'})])
session = Session(caller, callee)
connector, defer = connect_to_dispatcher()
defer.addCallback(caller_update, session, caller_media, callee_media)
defer.addCallback(disconnect, connector)
defer.addErrback(catch_all_err)
reactor.run()
diff --git a/test/icetest2.py b/test/icetest2.py
index 1e13d5d..d821652 100755
--- a/test/icetest2.py
+++ b/test/icetest2.py
@@ -1,115 +1,119 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
# Copyright (C) 2009 AG Projects
#
"""
This test simulates a call flow with ICE where the relay is selected as a candidate:
- The caller sends an INVITE
- the callee sends a 200 OK
- Both parties will send probing STUN requests for a few seconds
- Both parties will stop the probes and start sending media through the relay
(Note that a re-INVITE will be sent, this is due to a limitatin in the test framework)
- After 5 seconds, the caller will send a BYE
"""
from common import *
def caller_update(protocol, session, caller_media, callee_media):
print('doing update for caller')
defer = session.do_update(protocol, 'caller', 'request', False)
defer.addCallback(callee_update, protocol, session, caller_media, callee_media)
return defer
def callee_update(callee_addr, protocol, session, caller_media, callee_media):
print('doing update for callee')
defer = session.do_update(protocol, 'callee', 'reply', True)
defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media)
return defer
-def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media):
+def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media):
+ (caller_ip, caller_ports) = caller_addr
+ (callee_ip, callee_ports) = callee_addr
print('starting STUN probes for both parties')
session.caller.start_media(caller_ip, caller_ports, send_stun=True)
session.callee.start_media(callee_ip, callee_ports, send_stun=True)
defer = DeferredList([caller_media, callee_media])
defer.addCallback(wait, protocol, session)
return defer
def wait(result, protocol, session):
print('got STUN, waiting 5 seconds')
defer = Deferred()
defer.addCallback(stop_media, protocol, session)
reactor.callLater(5, defer.callback, None)
return defer
def stop_media(result, protocol, session):
print('stopping STUN probes')
defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()])
defer.addCallback(change_callee, protocol, session)
return defer
def change_callee(result, protocol, session):
print('sending new update for callee')
caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {'ice': 'yes'})])
callee_media = session.callee.set_media([('audio', 30000, 'sendrecv', {'ice': 'yes'})])
media_defer = DeferredList([caller_media, callee_media])
defer = session.do_update(protocol, 'callee', 'request', False)
defer.addCallback(change_caller, protocol, session, media_defer)
return defer
-def change_caller((caller_ip, caller_ports), protocol, session, media_defer):
+def change_caller(caller_addr, protocol, session, media_defer):
+ (caller_ip, caller_ports) = caller_addr
print('sending new update for caller')
defer = session.do_update(protocol, 'caller', 'reply', True)
defer.addCallback(start_new_media, protocol, session, media_defer, caller_ip, caller_ports)
return defer
-def start_new_media((callee_ip, callee_ports), protocol, session, media_defer, caller_ip, caller_ports):
+def start_new_media(callee_addr, protocol, session, media_defer, caller_ip, caller_ports):
+ (callee_ip, callee_ports) = callee_addr
print('starting media')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
media_defer.addCallback(wait2, protocol, session)
return media_defer
def wait2(result, protocol, session):
print('got media, waiting 5 seconds')
defer = Deferred()
defer.addCallback(kthxbye, protocol, session)
reactor.callLater(5, defer.callback, None)
return defer
def kthxbye(result, protocol, session):
print('sending remove')
return session.do_remove(protocol, 'caller')
def disconnect(result, connector):
print('disconnecting')
connector.disconnect()
reactor.callLater(1, reactor.stop)
def catch_all_err(failure):
print(failure)
if __name__ == '__main__':
caller = Endpoint('Alice ', 'Caller UA', True)
caller_media = caller.set_media([('audio', 40000, 'sendrecv', {'ice': 'yes'})])
callee = Endpoint('Bob ', 'Callee UA', False)
callee_media = callee.set_media([('audio', 30000, 'sendrecv', {'ice': 'yes'})])
session = Session(caller, callee)
connector, defer = connect_to_dispatcher()
defer.addCallback(caller_update, session, caller_media, callee_media)
defer.addCallback(disconnect, connector)
defer.addErrback(catch_all_err)
reactor.run()
diff --git a/test/multitest1.py b/test/multitest1.py
index f1ffe13..6a378b5 100755
--- a/test/multitest1.py
+++ b/test/multitest1.py
@@ -1,74 +1,77 @@
-#!/usr/bin/python2
+
+#!/usr/bin/python3
# Copyright (C) 2008 AG Projects
#
"""
This test simulates a normal call flow:
- The caller sends an INVITE
- the callee sends a 200 OK
- Both parties will start sending media
- Media will flow for 5 seconds
- The callee will send a BYE
"""
from common import *
def caller_update(protocol, session, caller_media, callee_media):
print('doing update for caller')
defer = session.do_update(protocol, 'caller', 'request', False)
defer.addCallback(callee_update, protocol, session, caller_media, callee_media)
return defer
def callee_update(callee_addr, protocol, session, caller_media, callee_media):
print('doing update for callee')
defer = session.do_update(protocol, 'callee', 'reply', True)
defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media)
return defer
-def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media):
+def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media):
+ (caller_ip, caller_ports) = caller_addr
+ (callee_ip, callee_ports) = callee_addr
print('starting media for both parties')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
defer = DeferredList([caller_media, callee_media])
defer.addCallback(wait, protocol, session)
return defer
def wait(result, protocol, session):
print('got media, waiting 30 seconds')
defer = Deferred()
defer.addCallback(kthxbye, protocol, session)
reactor.callLater(30, defer.callback, None)
return defer
def kthxbye(result, protocol, session):
print('sending remove')
return session.do_remove(protocol, 'callee')
def disconnect(result, connector):
print('disconnecting')
connector.disconnect()
reactor.callLater(1, reactor.stop)
def catch_all_err(failure):
print(failure)
if __name__ == '__main__':
caller = Endpoint('Alice ', 'Caller UA', True)
caller_media = caller.set_media([('audio', 40000, 'sendrecv', {})])
callee = Endpoint('Bob ', 'Callee UA', False)
callee_media = callee.set_media([('audio', 30000, 'sendrecv', {})])
session = Session(caller, callee)
connector, defer = connect_to_dispatcher()
defer.addCallback(caller_update, session, caller_media, callee_media)
defer.addCallback(disconnect, connector)
defer.addErrback(catch_all_err)
reactor.run()
diff --git a/test/multitest2.py b/test/multitest2.py
index e29352b..34be241 100755
--- a/test/multitest2.py
+++ b/test/multitest2.py
@@ -1,74 +1,77 @@
-#!/usr/bin/python2
+
+#!/usr/bin/python3
# Copyright (C) 2008 AG Projects
#
"""
This test simulates a normal call flow:
- The caller sends an INVITE
- the callee sends a 200 OK
- Both parties will start sending media
- Media will flow for 5 seconds
- The callee will send a BYE
"""
from common import *
def caller_update(protocol, session, caller_media, callee_media):
print('doing update for caller')
defer = session.do_update(protocol, 'caller', 'request', False)
defer.addCallback(callee_update, protocol, session, caller_media, callee_media)
return defer
def callee_update(callee_addr, protocol, session, caller_media, callee_media):
print('doing update for callee')
defer = session.do_update(protocol, 'callee', 'reply', True)
defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media)
return defer
-def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media):
+def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media):
+ (caller_ip, caller_ports) = caller_addr
+ (callee_ip, callee_ports) = callee_addr
print('starting media for both parties')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
defer = DeferredList([caller_media, callee_media])
defer.addCallback(wait, protocol, session)
return defer
def wait(result, protocol, session):
print('got media, waiting 35 seconds')
defer = Deferred()
defer.addCallback(kthxbye, protocol, session)
reactor.callLater(35, defer.callback, None)
return defer
def kthxbye(result, protocol, session):
print('sending remove')
return session.do_remove(protocol, 'callee')
def disconnect(result, connector):
print('disconnecting')
connector.disconnect()
reactor.callLater(1, reactor.stop)
def catch_all_err(failure):
print(failure)
if __name__ == '__main__':
caller = Endpoint('Alice ', 'Caller UA', True)
caller_media = caller.set_media([('audio', 40001, 'sendrecv', {})])
callee = Endpoint('Bob ', 'Callee UA', False)
callee_media = callee.set_media([('audio', 30001, 'sendrecv', {})])
session = Session(caller, callee)
connector, defer = connect_to_dispatcher()
defer.addCallback(caller_update, session, caller_media, callee_media)
defer.addCallback(disconnect, connector)
defer.addErrback(catch_all_err)
reactor.run()
diff --git a/test/multitest3.py b/test/multitest3.py
index bb98f89..828227d 100755
--- a/test/multitest3.py
+++ b/test/multitest3.py
@@ -1,74 +1,76 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
# Copyright (C) 2008 AG Projects
#
"""
This test simulates a normal call flow:
- The caller sends an INVITE
- the callee sends a 200 OK
- Both parties will start sending media
- Media will flow for 5 seconds
- The callee will send a BYE
"""
from common import *
def caller_update(protocol, session, caller_media, callee_media):
print('doing update for caller')
defer = session.do_update(protocol, 'caller', 'request', False)
defer.addCallback(callee_update, protocol, session, caller_media, callee_media)
return defer
def callee_update(callee_addr, protocol, session, caller_media, callee_media):
print('doing update for callee')
defer = session.do_update(protocol, 'callee', 'reply', True)
defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media)
return defer
-def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media):
+def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media):
+ (caller_ip, caller_ports) = caller_addr
+ (callee_ip, callee_ports) = callee_addr
print('starting media for both parties')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
defer = DeferredList([caller_media, callee_media])
defer.addCallback(wait, protocol, session)
return defer
def wait(result, protocol, session):
print('got media, waiting 25 seconds')
defer = Deferred()
defer.addCallback(kthxbye, protocol, session)
reactor.callLater(25, defer.callback, None)
return defer
def kthxbye(result, protocol, session):
print('sending remove')
return session.do_remove(protocol, 'callee')
def disconnect(result, connector):
print('disconnecting')
connector.disconnect()
reactor.callLater(1, reactor.stop)
def catch_all_err(failure):
print(failure)
if __name__ == '__main__':
caller = Endpoint('Alice ', 'Caller UA', True)
caller_media = caller.set_media([('audio', 40002, 'sendrecv', {})])
callee = Endpoint('Bob ', 'Callee UA', False)
callee_media = callee.set_media([('audio', 30002, 'sendrecv', {})])
session = Session(caller, callee)
connector, defer = connect_to_dispatcher()
defer.addCallback(caller_update, session, caller_media, callee_media)
defer.addCallback(disconnect, connector)
defer.addErrback(catch_all_err)
reactor.run()
diff --git a/test/multitest4.py b/test/multitest4.py
index d35752f..51d57cf 100755
--- a/test/multitest4.py
+++ b/test/multitest4.py
@@ -1,74 +1,76 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
# Copyright (C) 2008 AG Projects
#
"""
This test simulates a normal call flow:
- The caller sends an INVITE
- the callee sends a 200 OK
- Both parties will start sending media
- Media will flow for 5 seconds
- The callee will send a BYE
"""
from common import *
def caller_update(protocol, session, caller_media, callee_media):
print('doing update for caller')
defer = session.do_update(protocol, 'caller', 'request', False)
defer.addCallback(callee_update, protocol, session, caller_media, callee_media)
return defer
def callee_update(callee_addr, protocol, session, caller_media, callee_media):
print('doing update for callee')
defer = session.do_update(protocol, 'callee', 'reply', True)
defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media)
return defer
-def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media):
+def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media):
+ (caller_ip, caller_ports) = caller_addr
+ (callee_ip, callee_ports) = callee_addr
print('starting media for both parties')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
defer = DeferredList([caller_media, callee_media])
defer.addCallback(wait, protocol, session)
return defer
def wait(result, protocol, session):
print('got media, waiting 40 seconds')
defer = Deferred()
defer.addCallback(kthxbye, protocol, session)
reactor.callLater(40, defer.callback, None)
return defer
def kthxbye(result, protocol, session):
print('sending remove')
return session.do_remove(protocol, 'callee')
def disconnect(result, connector):
print('disconnecting')
connector.disconnect()
reactor.callLater(1, reactor.stop)
def catch_all_err(failure):
print(failure)
if __name__ == '__main__':
caller = Endpoint('Alice ', 'Caller UA', True)
caller_media = caller.set_media([('audio', 40004, 'sendrecv', {})])
callee = Endpoint('Bob ', 'Callee UA', False)
callee_media = callee.set_media([('audio', 30004, 'sendrecv', {})])
session = Session(caller, callee)
connector, defer = connect_to_dispatcher()
defer.addCallback(caller_update, session, caller_media, callee_media)
defer.addCallback(disconnect, connector)
defer.addErrback(catch_all_err)
reactor.run()
diff --git a/test/setuptest1.py b/test/setuptest1.py
index 048fd9b..f1cd65b 100755
--- a/test/setuptest1.py
+++ b/test/setuptest1.py
@@ -1,39 +1,39 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
# Copyright (C) 2008 AG Projects
#
"""
This test scenario simulates the caller sending an INVITE, nothing is
received in return. The relay should discard the session after a while.
"""
from common import *
def caller_update(protocol, session):
print('doing update for caller')
return session.do_update(protocol, 'caller', 'request', False)
def disconnect(result, connector):
print('disconnecting')
connector.disconnect()
reactor.callLater(1, reactor.stop)
def catch_all_err(failure):
print(failure)
if __name__ == '__main__':
caller = Endpoint('Alice ', 'Caller UA', True)
caller.set_media([('audio', 40000, 'sendrecv', {})])
callee = Endpoint('Bob ', 'Callee UA', False)
callee.set_media([('audio', 30000, 'sendrecv', {})])
session = Session(caller, callee)
connector, defer = connect_to_dispatcher()
defer.addCallback(caller_update, session)
defer.addCallback(disconnect, connector)
defer.addErrback(catch_all_err)
reactor.run()
diff --git a/test/setuptest2.py b/test/setuptest2.py
index 1cd1f7a..f590902 100755
--- a/test/setuptest2.py
+++ b/test/setuptest2.py
@@ -1,74 +1,76 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
# Copyright (C) 2008 AG Projects
#
"""
This test simulates a normal call flow:
- The caller sends an INVITE
- the callee sends a 200 OK
- Both parties will start sending media
- Media will flow for 5 seconds
- The callee will send a BYE
"""
from common import *
def caller_update(protocol, session, caller_media, callee_media):
print('doing update for caller')
defer = session.do_update(protocol, 'caller', 'request', False)
defer.addCallback(callee_update, protocol, session, caller_media, callee_media)
return defer
def callee_update(callee_addr, protocol, session, caller_media, callee_media):
print('doing update for callee')
defer = session.do_update(protocol, 'callee', 'reply', True)
defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media)
return defer
-def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media):
+def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media):
+ (caller_ip, caller_ports) = caller_addr
+ (callee_ip, callee_ports) = callee_addr
print('starting media for both parties')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
defer = DeferredList([caller_media, callee_media])
defer.addCallback(wait, protocol, session)
return defer
def wait(result, protocol, session):
print('got media, waiting 5 seconds')
defer = Deferred()
defer.addCallback(kthxbye, protocol, session)
reactor.callLater(5, defer.callback, None)
return defer
def kthxbye(result, protocol, session):
print('sending remove')
return session.do_remove(protocol, 'callee')
def disconnect(result, connector):
print('disconnecting')
connector.disconnect()
reactor.callLater(1, reactor.stop)
def catch_all_err(failure):
print(failure)
if __name__ == '__main__':
caller = Endpoint('Alice ', 'Caller UA', True)
caller_media = caller.set_media([('audio', 40000, 'sendrecv', {})])
callee = Endpoint('Bob ', 'Callee UA', False)
callee_media = callee.set_media([('audio', 30000, 'sendrecv', {})])
session = Session(caller, callee)
connector, defer = connect_to_dispatcher()
defer.addCallback(caller_update, session, caller_media, callee_media)
defer.addCallback(disconnect, connector)
defer.addErrback(catch_all_err)
reactor.run()
diff --git a/test/setuptest3.py b/test/setuptest3.py
index 76934c6..ca52840 100755
--- a/test/setuptest3.py
+++ b/test/setuptest3.py
@@ -1,74 +1,76 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
# Copyright (C) 2008 AG Projects
#
"""
This test simulates a normal call flow without a BYE:
- The caller sends an INVITE
- the callee sends a 200 OK
- Both parties will start sending media
- Media will flow for 5 seconds
- Both parties will stop sending media
"""
from common import *
def caller_update(protocol, session, caller_media, callee_media):
print('doing update for caller')
defer = session.do_update(protocol, 'caller', 'request', False)
defer.addCallback(callee_update, protocol, session, caller_media, callee_media)
return defer
def callee_update(callee_addr, protocol, session, caller_media, callee_media):
print('doing update for callee')
defer = session.do_update(protocol, 'callee', 'reply', True)
defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media)
return defer
-def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media):
+def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media):
+ (caller_ip, caller_ports) = callee_addr
+ (callee_ip, callee_ports) = callee_addr
print('starting media for both parties')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
defer = DeferredList([caller_media, callee_media])
defer.addCallback(wait, protocol, session)
return defer
def wait(result, protocol, session):
print('got media, waiting 5 seconds')
defer = Deferred()
defer.addCallback(stop_media, protocol, session)
reactor.callLater(5, defer.callback, None)
return defer
def stop_media(result, protocol, session):
print('stopping media for callee')
return session.callee.stop_media()
def disconnect(result, connector):
print('disconnecting')
connector.disconnect()
reactor.callLater(1, reactor.stop)
def catch_all_err(failure):
print(failure)
if __name__ == '__main__':
caller = Endpoint('Alice ', 'Caller UA', True)
caller_media = caller.set_media([('audio', 40000, 'sendrecv', {})])
callee = Endpoint('Bob ', 'Callee UA', False)
callee_media = callee.set_media([('audio', 30000, 'sendrecv', {})])
session = Session(caller, callee)
connector, defer = connect_to_dispatcher()
defer.addCallback(caller_update, session, caller_media, callee_media)
defer.addCallback(disconnect, connector)
defer.addErrback(catch_all_err)
reactor.run()
diff --git a/test/setuptest4.py b/test/setuptest4.py
index d8c6228..f17784b 100755
--- a/test/setuptest4.py
+++ b/test/setuptest4.py
@@ -1,85 +1,88 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
# Copyright (C) 2008 AG Projects
#
"""
This test simulates a normal call flow, with an added ACK confirming
the SDP:
- The caller sends an INVITE
- the callee sends a 200 OK
- Both parties will start sending media
- the caller sends an ACK with SDP
- Media will flow for 5 seconds
- The callee will send a BYE
"""
from common import *
def caller_update(protocol, session, caller_media, callee_media):
print('doing update for caller')
defer = session.do_update(protocol, 'caller', 'request', False)
defer.addCallback(callee_update, protocol, session, caller_media, callee_media)
return defer
def callee_update(callee_addr, protocol, session, caller_media, callee_media):
print('doing update for callee')
defer = session.do_update(protocol, 'callee', 'reply', False)
defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media)
return defer
-def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media):
+def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media):
+ (caller_ip, caller_ports) = caller_addr
+ (callee_ip, callee_ports) = callee_addr
print('starting media for both parties')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
defer = DeferredList([caller_media, callee_media])
defer.addCallback(caller_ack, protocol, session, callee_ip, callee_ports)
return defer
def caller_ack(result, protocol, session, callee_ip, callee_ports):
print('got media, doing ACK for caller')
defer = session.do_update(protocol, 'caller', 'request', True)
defer.addCallback(wait, protocol, session, callee_ip, callee_ports)
return defer
-def wait((callee_ack_ip, callee_ack_ports), protocol, session, callee_ip, callee_ports):
+def wait(callee_ack_addr, protocol, session, callee_ip, callee_ports):
+ (callee_ack_ip, callee_ack_ports) = callee_ack_addr
print('waiting 5 seconds')
assert (callee_ack_ip == callee_ip)
assert (callee_ack_ports == callee_ports)
defer = Deferred()
defer.addCallback(kthxbye, protocol, session)
reactor.callLater(5, defer.callback, None)
return defer
def kthxbye(result, protocol, session):
print('sending remove')
return session.do_remove(protocol, 'callee')
def disconnect(result, connector):
print('disconnecting')
connector.disconnect()
reactor.callLater(1, reactor.stop)
def catch_all_err(failure):
print(failure)
if __name__ == '__main__':
caller = Endpoint('Alice ', 'Caller UA', True)
caller_media = caller.set_media([('audio', 40000, 'sendrecv', {})])
callee = Endpoint('Bob ', 'Callee UA', False)
callee_media = callee.set_media([('audio', 30000, 'sendrecv', {})])
session = Session(caller, callee)
connector, defer = connect_to_dispatcher()
defer.addCallback(caller_update, session, caller_media, callee_media)
defer.addCallback(disconnect, connector)
defer.addErrback(catch_all_err)
reactor.run()
diff --git a/test/setuptest5.py b/test/setuptest5.py
index b82a708..e18382c 100755
--- a/test/setuptest5.py
+++ b/test/setuptest5.py
@@ -1,74 +1,76 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
# Copyright (C) 2008 AG Projects
#
"""
This test simulates call setup where no SDP is sent in the INVITE:
- the callee sends a 200 OK
- the caller sends a ACK with SDP
- Both parties will start sending media
- Media will flow for 5 seconds
- The callee will send a BYE
"""
from common import *
def callee_update(protocol, session, caller_media, callee_media):
print('doing update for callee')
defer = session.do_update(protocol, 'callee', 'reply', False)
defer.addCallback(caller_update, protocol, session, caller_media, callee_media)
return defer
def caller_update(caller_addr, protocol, session, caller_media, callee_media):
print('doing update for caller')
defer = session.do_update(protocol, 'caller', 'request', True)
defer.addCallback(do_media, caller_addr, protocol, session, caller_media, callee_media)
return defer
-def do_media((callee_ip, callee_ports), (caller_ip, caller_ports), protocol, session, caller_media, callee_media):
+def do_media(callee_addr, caller_addr, protocol, session, caller_media, callee_media):
+ (callee_ip, callee_ports) = callee_addr
+ (caller_ip, caller_ports) = caller_addr
print('starting media for both parties')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
defer = DeferredList([caller_media, callee_media])
defer.addCallback(wait, protocol, session)
return defer
def wait(result, protocol, session):
print('got media, waiting 5 seconds')
defer = Deferred()
defer.addCallback(kthxbye, protocol, session)
reactor.callLater(5, defer.callback, None)
return defer
def kthxbye(result, protocol, session):
print('sending remove')
return session.do_remove(protocol, 'callee')
def disconnect(result, connector):
print('disconnecting')
connector.disconnect()
reactor.callLater(1, reactor.stop)
def catch_all_err(failure):
print(failure)
if __name__ == '__main__':
caller = Endpoint('Alice ', 'Caller UA', True)
caller_media = caller.set_media([('audio', 40000, 'sendrecv', {})])
callee = Endpoint('Bob ', 'Callee UA', False)
callee_media = callee.set_media([('audio', 30000, 'sendrecv', {})])
session = Session(caller, callee)
connector, defer = connect_to_dispatcher()
defer.addCallback(callee_update, session, caller_media, callee_media)
defer.addCallback(disconnect, connector)
defer.addErrback(catch_all_err)
reactor.run()
diff --git a/test/setuptest6.py b/test/setuptest6.py
index dea99c5..55acdbb 100755
--- a/test/setuptest6.py
+++ b/test/setuptest6.py
@@ -1,88 +1,90 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
# Copyright (C) 2008 AG Projects
#
"""
This test simulates a session in which the caller proposes 3 streams in the
INVITE and the callee rejects two of these.
- The caller sends an INVITE with 1 video stream and 2 audio streams
- the callee sends a 200 OK with the ports for two of the streams set to 0
- Both parties start sending media
- Media flows for 5 seconds
- The callee sends a BYE
"""
from common import *
def caller_update(protocol, session, caller_media, callee_media):
print('doing update for caller')
defer = session.do_update(protocol, 'caller', 'request', False)
defer.addCallback(callee_update, protocol, session, caller_media, callee_media)
return defer
def callee_update(callee_addr, protocol, session, caller_media, callee_media):
print('doing update for callee')
defer = session.do_update(protocol, 'callee', 'reply', True)
defer.addCallback(change_caller1, callee_addr, protocol, session, caller_media, callee_media)
return defer
def change_caller1(caller_addr, callee_addr, protocol, session, caller_media, callee_media):
print('stopping media for caller')
defer = session.caller.stop_media()
defer.addCallback(change_caller2, caller_addr, callee_addr, protocol, session, callee_media)
return defer
def change_caller2(result, caller_addr, callee_addr, protocol, session, callee_media):
print('setting new media for caller')
caller_media = caller.set_media([('audio', 0, 'sendrecv', {}), ('video', 0, 'sendrecv', {}), ('audio', 40020, 'sendrecv', {})])
return do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media)
-def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media):
+def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media):
+ (caller_ip, caller_ports) = caller_addr
+ (callee_ip, callee_ports) = callee_addr
print('starting media for both parties')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
defer = DeferredList([caller_media, callee_media])
defer.addCallback(wait, protocol, session)
return defer
def wait(result, protocol, session):
print('got media, waiting 5 seconds')
defer = Deferred()
defer.addCallback(kthxbye, protocol, session)
reactor.callLater(5, defer.callback, None)
return defer
def kthxbye(result, protocol, session):
print('sending remove')
return session.do_remove(protocol, 'callee')
def disconnect(result, connector):
print('disconnecting')
connector.disconnect()
reactor.callLater(1, reactor.stop)
def catch_all_err(failure):
print(failure)
if __name__ == '__main__':
caller = Endpoint('Alice ', 'Caller UA', True)
caller_media = caller.set_media([('audio', 40000, 'sendrecv', {}), ('video', 40010, 'sendrecv', {}), ('audio', 40020, 'sendrecv', {})])
callee = Endpoint('Bob ', 'Callee UA', False)
callee_media = callee.set_media([('audio', 0, 'sendrecv', {}), ('video', 0, 'sendrecv', {}), ('audio', 30020, 'sendrecv', {})])
session = Session(caller, callee)
connector, defer = connect_to_dispatcher()
defer.addCallback(caller_update, session, caller_media, callee_media)
defer.addCallback(disconnect, connector)
defer.addErrback(catch_all_err)
reactor.run()
diff --git a/test/setuptest7.py b/test/setuptest7.py
index fa7ccc5..b1b242e 100755
--- a/test/setuptest7.py
+++ b/test/setuptest7.py
@@ -1,72 +1,74 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
# Copyright (C) 2008 AG Projects
#
"""
This test simulates a normal call flow:
- The caller sends an INVITE with a media stream with port=0
- The callee sends a 200 OK
- The callee will send a BYE after 5 seconds
"""
from common import *
def caller_update(protocol, session, caller_media, callee_media):
print('doing update for caller')
defer = session.do_update(protocol, 'caller', 'request', False)
defer.addCallback(callee_update, protocol, session, caller_media, callee_media)
return defer
def callee_update(callee_addr, protocol, session, caller_media, callee_media):
print('doing update for callee')
defer = session.do_update(protocol, 'callee', 'reply', True)
defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media)
return defer
-def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media):
+def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media):
+ (caller_ip, caller_ports) = caller_addr
+ (callee_ip, callee_ports) = callee_addr
print('starting media for both parties')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
defer = DeferredList([caller_media, callee_media])
defer.addCallback(wait, protocol, session)
return defer
def wait(result, protocol, session):
print('got media, waiting 5 seconds')
defer = Deferred()
defer.addCallback(kthxbye, protocol, session)
reactor.callLater(5, defer.callback, None)
return defer
def kthxbye(result, protocol, session):
print('sending remove')
return session.do_remove(protocol, 'callee')
def disconnect(result, connector):
print('disconnecting')
connector.disconnect()
reactor.callLater(1, reactor.stop)
def catch_all_err(failure):
print(failure)
if __name__ == '__main__':
caller = Endpoint('Alice ', 'Caller UA', True)
caller_media = caller.set_media([('audio', 0, 'sendrecv', {})])
callee = Endpoint('Bob ', 'Callee UA', False)
callee_media = callee.set_media([('audio', 40000, 'sendrecv', {})])
session = Session(caller, callee)
connector, defer = connect_to_dispatcher()
defer.addCallback(caller_update, session, caller_media, callee_media)
defer.addCallback(disconnect, connector)
defer.addErrback(catch_all_err)
reactor.run()
diff --git a/test/updatetest1.py b/test/updatetest1.py
index 344dd05..4000ff1 100755
--- a/test/updatetest1.py
+++ b/test/updatetest1.py
@@ -1,110 +1,113 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
# Copyright (C) 2008 AG Projects
#
"""
This test simulates a call setup with an updated reply from the callee:
- The caller sends an INVITE
- The callee replies with .e.g a 183
- Both parties start sending media
- Media flows for 5 seconds
- Media stops
- The callee sends a 200 OK with a new port
- Media flows again for 5 seconds
- The caller sends a BYE
"""
from common import *
def caller_update(protocol, session, caller_media, callee_media):
print('doing update for caller')
defer = session.do_update(protocol, 'caller', 'request', False)
defer.addCallback(callee_update, protocol, session, caller_media, callee_media)
return defer
def callee_update(callee_addr, protocol, session, caller_media, callee_media):
print('doing update for callee')
defer = session.do_update(protocol, 'callee', 'reply', False)
defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media)
return defer
-def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media):
+def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media):
+ (caller_ip, caller_ports) = caller_addr
+ (callee_ip, callee_ports) = callee_addr
print('starting media for both parties')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
defer = DeferredList([caller_media, callee_media])
defer.addCallback(wait, protocol, session, callee_ip, callee_ports)
return defer
def wait(result, protocol, session, callee_ip, callee_ports):
print('got media, waiting 5 seconds')
defer = Deferred()
defer.addCallback(stop_media, protocol, session, callee_ip, callee_ports)
reactor.callLater(5, defer.callback, None)
return defer
def stop_media(result, protocol, session, callee_ip, callee_ports):
print('stopping media')
defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()])
defer.addCallback(change_callee, protocol, session, callee_ip, callee_ports)
return defer
def change_callee(result, protocol, session, callee_ip, callee_ports):
print('sending new update for callee')
caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {})])
callee_media = session.callee.set_media([('audio', 30020, 'sendrecv', {})])
media_defer = DeferredList([caller_media, callee_media])
defer = session.do_update(protocol, 'callee', 'reply', True)
defer.addCallback(start_new_media, protocol, session, media_defer, callee_ip, callee_ports)
return defer
-def start_new_media((caller_ip, caller_ports), protocol, session, media_defer, callee_ip, callee_ports):
+def start_new_media(caller_addr, protocol, session, media_defer, callee_ip, callee_ports):
+ (caller_ip, caller_ports) = caller_addr
print('starting new media')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
media_defer.addCallback(wait2, protocol, session)
return media_defer
def wait2(result, protocol, session):
print('got new media, waiting 5 seconds')
defer = Deferred()
defer.addCallback(kthxbye, protocol, session)
reactor.callLater(5, defer.callback, None)
return defer
def kthxbye(result, protocol, session):
print('sending remove')
return session.do_remove(protocol, 'caller')
def disconnect(result, connector):
print('disconnecting')
connector.disconnect()
reactor.callLater(1, reactor.stop)
def catch_all_err(failure):
print(failure)
if __name__ == '__main__':
caller = Endpoint('Alice ', 'Caller UA', True)
caller_media = caller.set_media([('audio', 40000, 'sendrecv', {})])
callee = Endpoint('Bob ', 'Callee UA', False)
callee_media = callee.set_media([('audio', 30000, 'sendrecv', {})])
session = Session(caller, callee)
connector, defer = connect_to_dispatcher()
defer.addCallback(caller_update, session, caller_media, callee_media)
defer.addCallback(disconnect, connector)
defer.addErrback(catch_all_err)
reactor.run()
diff --git a/test/updatetest2.py b/test/updatetest2.py
index edc1671..b38b0d8 100755
--- a/test/updatetest2.py
+++ b/test/updatetest2.py
@@ -1,116 +1,120 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
# Copyright (C) 2008 AG Projects
#
"""
This test simulates a session with audio and video media flowing, after which
the callee removes the video stream and only audio flows:
- caller sends INVITE, callee sends 200 ok
- audio and video media flows for 5 seconds
- callee proposes to keep only the audio stream using a re-INVITE, caller
sends OK
- audio media flows for 5 seconds
- caller sends BYE
"""
from common import *
def caller_update(protocol, session, caller_media, callee_media):
print('doing update for caller')
defer = session.do_update(protocol, 'caller', 'request', False)
defer.addCallback(callee_update, protocol, session, caller_media, callee_media)
return defer
def callee_update(callee_addr, protocol, session, caller_media, callee_media):
print('doing update for callee')
defer = session.do_update(protocol, 'callee', 'reply', True)
defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media)
return defer
-def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media):
+def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media):
+ (caller_ip, caller_ports) = caller_addr
+ (callee_ip, callee_ports) = callee_addr
print('starting media for both parties')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
defer = DeferredList([caller_media, callee_media])
defer.addCallback(wait, protocol, session)
return defer
def wait(result, protocol, session):
print('got media, waiting 5 seconds')
defer = Deferred()
defer.addCallback(stop_media, protocol, session)
reactor.callLater(5, defer.callback, None)
return defer
def stop_media(result, protocol, session):
print('stopping media')
defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()])
defer.addCallback(change_callee, protocol, session)
return defer
def change_callee(result, protocol, session):
print('sending new update for callee')
caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {})])
callee_media = session.callee.set_media([('audio', 30000, 'sendrecv', {})])
media_defer = DeferredList([caller_media, callee_media])
defer = session.do_update(protocol, 'callee', 'request', False)
defer.addCallback(change_caller, protocol, session, media_defer)
return defer
-def change_caller((caller_ip, caller_ports), protocol, session, media_defer):
+def change_caller(caller_addr, protocol, session, media_defer):
+ (caller_ip, caller_ports) = caller_addr
print('sending new update for caller')
defer = session.do_update(protocol, 'caller', 'reply', True)
defer.addCallback(start_new_media, protocol, session, media_defer, caller_ip, caller_ports)
return defer
-def start_new_media((callee_ip, callee_ports), protocol, session, media_defer, caller_ip, caller_ports):
+def start_new_media(callee_addr, protocol, session, media_defer, caller_ip, caller_ports):
+ (callee_ip, callee_ports) = callee_addr
print('starting new media')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
media_defer.addCallback(wait2, protocol, session)
return media_defer
def wait2(result, protocol, session):
print('got new media, waiting 5 seconds')
defer = Deferred()
defer.addCallback(kthxbye, protocol, session)
reactor.callLater(5, defer.callback, None)
return defer
def kthxbye(result, protocol, session):
print('sending remove')
return session.do_remove(protocol, 'caller')
def disconnect(result, connector):
print('disconnecting')
connector.disconnect()
reactor.callLater(1, reactor.stop)
def catch_all_err(failure):
print(failure)
if __name__ == '__main__':
caller = Endpoint('Alice ', 'Caller UA', True)
caller_media = caller.set_media([('audio', 40000, 'sendrecv', {}), ('video', 40010, 'sendrecv', {})])
callee = Endpoint('Bob ', 'Callee UA', False)
callee_media = callee.set_media([('audio', 30000, 'sendrecv', {}), ('video', 30010, 'sendrecv', {})])
session = Session(caller, callee)
connector, defer = connect_to_dispatcher()
defer.addCallback(caller_update, session, caller_media, callee_media)
defer.addCallback(disconnect, connector)
defer.addErrback(catch_all_err)
reactor.run()
diff --git a/test/updatetest3.py b/test/updatetest3.py
index 54fa48e..66831fc 100755
--- a/test/updatetest3.py
+++ b/test/updatetest3.py
@@ -1,104 +1,106 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
# Copyright (C) 2008 AG Projects
#
"""
This test simulates a session that starts with only video, then two audio
streams are added and finally only one of the audio streams remains.
"""
from common import *
def phase1(protocol, session):
print('setting up 1 video stream')
caller_media = session.caller.set_media([('video', 40000, 'sendrecv', {})])
callee_media = session.callee.set_media([('video', 30000, 'sendrecv', {})])
media_defer = DeferredList([caller_media, callee_media])
defer = succeed(None)
defer.addCallback(caller_update, protocol, session, media_defer, phase2)
return defer
def phase2(result, protocol, session):
print('adding 2 audio streams')
caller_media = session.caller.set_media([('video', 40000, 'sendrecv', {}), ('audio', 40010, 'sendrecv', {}), ('audio', 40020, 'sendrecv', {})])
callee_media = session.callee.set_media([('video', 30000, 'sendrecv', {}), ('audio', 30010, 'sendrecv', {}), ('audio', 30020, 'sendrecv', {})])
media_defer = DeferredList([caller_media, callee_media])
defer = succeed(None)
defer.addCallback(caller_update, protocol, session, media_defer, phase3)
return defer
def phase3(result, protocol, session):
print('removing 1 video and 1 audio stream')
caller_media = session.caller.set_media([('audio', 40020, 'sendrecv', {})])
callee_media = session.callee.set_media([('audio', 30020, 'sendrecv', {})])
media_defer = DeferredList([caller_media, callee_media])
defer = succeed(None)
defer.addCallback(caller_update, protocol, session, media_defer, kthxbye)
return defer
def caller_update(result, protocol, session, media_defer, do_after):
print('doing update for caller')
defer = session.do_update(protocol, 'caller', 'request', False)
defer.addCallback(callee_update, protocol, session, media_defer, do_after)
return defer
def callee_update(callee_addr, protocol, session, media_defer, do_after):
print('doing update for callee')
defer = session.do_update(protocol, 'callee', 'reply', True)
defer.addCallback(do_media, callee_addr, protocol, session, media_defer, do_after)
return defer
-def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, media_defer, do_after):
+def do_media(caller_addr, callee_addr, protocol, session, media_defer, do_after):
+ (caller_ip, caller_ports) = caller_addr
+ (callee_ip, callee_ports) = callee_addr
print('starting media for both parties')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
media_defer.addCallback(wait, protocol, session, do_after)
return media_defer
def wait(result, protocol, session, do_after):
print('got media, waiting 5 seconds')
defer = Deferred()
defer.addCallback(stop_media, protocol, session, do_after)
reactor.callLater(5, defer.callback, None)
return defer
def stop_media(result, protocol, session, do_after):
print('stopping media')
defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()])
defer.addCallback(do_after, protocol, session)
return defer
def kthxbye(result, protocol, session):
print('sending remove')
return session.do_remove(protocol, 'caller')
def disconnect(result, connector):
print('disconnecting')
connector.disconnect()
reactor.callLater(1, reactor.stop)
def catch_all_err(failure):
print(failure)
if __name__ == '__main__':
caller = Endpoint('Alice ', 'Caller UA', True)
callee = Endpoint('Bob ', 'Callee UA', False)
session = Session(caller, callee)
connector, defer = connect_to_dispatcher()
defer.addCallback(phase1, session)
defer.addCallback(disconnect, connector)
defer.addErrback(catch_all_err)
reactor.run()
diff --git a/test/updatetest4.py b/test/updatetest4.py
index a258450..7a44e87 100755
--- a/test/updatetest4.py
+++ b/test/updatetest4.py
@@ -1,115 +1,119 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
# Copyright (C) 2008 AG Projects
#
"""
This test simulates a session with audio media flowing, after which
the callee changes the port of the media, e.g. through an UPDATE:
- caller sends INVITE, callee sends 200 ok
- audio and video media flows for 5 seconds
- callee changes the port of the audio stream through an UPATE or re-INVITE
- audio media flows for 5 seconds
- caller sends BYE
"""
from common import *
def caller_update(protocol, session, caller_media, callee_media):
print('doing update for caller')
defer = session.do_update(protocol, 'caller', 'request', False)
defer.addCallback(callee_update, protocol, session, caller_media, callee_media)
return defer
def callee_update(callee_addr, protocol, session, caller_media, callee_media):
print('doing update for callee')
defer = session.do_update(protocol, 'callee', 'reply', True)
defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media)
return defer
-def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media):
+def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media):
+ (caller_ip, caller_ports) = caller_addr
+ (callee_ip, callee_ports) = callee_addr
print('starting media for both parties')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
defer = DeferredList([caller_media, callee_media])
defer.addCallback(wait, protocol, session)
return defer
def wait(result, protocol, session):
print('got media, waiting 5 seconds')
defer = Deferred()
defer.addCallback(stop_media, protocol, session)
reactor.callLater(5, defer.callback, None)
return defer
def stop_media(result, protocol, session):
print('stopping media')
defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()])
defer.addCallback(change_callee, protocol, session)
return defer
def change_callee(result, protocol, session):
print('sending new update for callee')
caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {})])
callee_media = session.callee.set_media([('audio', 30010, 'sendrecv', {})])
media_defer = DeferredList([caller_media, callee_media])
defer = session.do_update(protocol, 'callee', 'request', False)
defer.addCallback(change_caller, protocol, session, media_defer)
return defer
-def change_caller((caller_ip, caller_ports), protocol, session, media_defer):
+def change_caller(caller_addr, protocol, session, media_defer):
+ (caller_ip, caller_ports) = caller_addr
print('sending new update for caller')
defer = session.do_update(protocol, 'caller', 'reply', True)
defer.addCallback(start_new_media, protocol, session, media_defer, caller_ip, caller_ports)
return defer
-def start_new_media((callee_ip, callee_ports), protocol, session, media_defer, caller_ip, caller_ports):
+def start_new_media(callee_addr, protocol, session, media_defer, caller_ip, caller_ports):
+ (callee_ip, callee_ports) = callee_addr
print('starting new media')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
media_defer.addCallback(wait2, protocol, session)
return media_defer
def wait2(result, protocol, session):
print('got new media, waiting 5 seconds')
defer = Deferred()
defer.addCallback(kthxbye, protocol, session)
reactor.callLater(5, defer.callback, None)
return defer
def kthxbye(result, protocol, session):
print('sending remove')
return session.do_remove(protocol, 'caller')
def disconnect(result, connector):
print('disconnecting')
connector.disconnect()
reactor.callLater(1, reactor.stop)
def catch_all_err(failure):
print(failure)
if __name__ == '__main__':
caller = Endpoint('Alice ', 'Caller UA', True)
caller_media = caller.set_media([('audio', 40000, 'sendrecv', {})])
callee = Endpoint('Bob ', 'Callee UA', False)
callee_media = callee.set_media([('audio', 30000, 'sendrecv', {})])
session = Session(caller, callee)
connector, defer = connect_to_dispatcher()
defer.addCallback(caller_update, session, caller_media, callee_media)
defer.addCallback(disconnect, connector)
defer.addErrback(catch_all_err)
reactor.run()
diff --git a/test/updatetest5.py b/test/updatetest5.py
index 34a2482..28cc790 100755
--- a/test/updatetest5.py
+++ b/test/updatetest5.py
@@ -1,148 +1,152 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
# Copyright (C) 2008 AG Projects
#
"""
This test simulates a call setup with an updated reply from the callee:
- The caller sends an INVITE
- The callee replies with a provisional response containg SDP e.g. 183
- Both parties start sending media
- Media flows for 5 seconds
- Media stops
- The callee sends another 183 with new port and to-tag (e.g. when the first PSTN gateway failed)
- Both parties start sending media
- Media flows for 5 seconds
- Media stops
- The callee sends a 200 OK with a new port
- Media flows again for 5 seconds
- The caller sends a BYE
"""
from common import *
def caller_update(protocol, session, caller_media, callee_media):
print('doing update for caller')
defer = session.do_update(protocol, 'caller', 'request', False)
defer.addCallback(callee_update, protocol, session, caller_media, callee_media)
return defer
def callee_update(callee_addr, protocol, session, caller_media, callee_media):
print('doing update for callee')
defer = session.do_update(protocol, 'callee', 'reply', False)
defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media)
return defer
-def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media):
+def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media):
+ (caller_ip, caller_ports) = caller_addr
+ (callee_ip, callee_ports) = callee_addr
print('starting media for both parties')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
defer = DeferredList([caller_media, callee_media])
defer.addCallback(wait, protocol, session, callee_ip, callee_ports)
return defer
def wait(result, protocol, session, callee_ip, callee_ports):
print('got media, waiting 5 seconds')
defer = Deferred()
defer.addCallback(stop_media, protocol, session, callee_ip, callee_ports)
reactor.callLater(5, defer.callback, None)
return defer
def stop_media(result, protocol, session, callee_ip, callee_ports):
print('stopping media')
defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()])
defer.addCallback(change_callee_prov, protocol, session, callee_ip, callee_ports)
return defer
def change_callee_prov(result, protocol, session, callee_ip, callee_ports):
print('sending new provisional update for callee')
session.callee.tag = 'newtotag'
caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {})])
callee_media = session.callee.set_media([('audio', 30010, 'sendrecv', {})])
media_defer = DeferredList([caller_media, callee_media])
defer = session.do_update(protocol, 'callee', 'reply', False)
defer.addCallback(start_new_media_prov, protocol, session, media_defer, callee_ip, callee_ports)
return defer
-def start_new_media_prov((caller_ip, caller_ports), protocol, session, media_defer, callee_ip, callee_ports):
+def start_new_media_prov(caller_addr, protocol, session, media_defer, callee_ip, callee_ports):
+ (caller_ip, caller_ports) = caller_addr
print('starting new media')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
media_defer.addCallback(wait2, protocol, session, callee_ip, callee_ports)
return media_defer
def wait2(result, protocol, session, callee_ip, callee_ports):
print('got new media, waiting 5 seconds')
defer = Deferred()
defer.addCallback(stop_media_prov, protocol, session, callee_ip, callee_ports)
reactor.callLater(5, defer.callback, None)
return defer
def stop_media_prov(result, protocol, session, callee_ip, callee_ports):
print('stopping media')
defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()])
defer.addCallback(change_callee, protocol, session, callee_ip, callee_ports)
return defer
def change_callee(result, protocol, session, callee_ip, callee_ports):
print('sending new update for callee')
caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {})])
callee_media = session.callee.set_media([('audio', 30020, 'sendrecv', {})])
media_defer = DeferredList([caller_media, callee_media])
defer = session.do_update(protocol, 'callee', 'reply', True)
defer.addCallback(start_new_media, protocol, session, media_defer, callee_ip, callee_ports)
return defer
-def start_new_media((caller_ip, caller_ports), protocol, session, media_defer, callee_ip, callee_ports):
+def start_new_media(caller_addr, protocol, session, media_defer, callee_ip, callee_ports):
+ (caller_ip, caller_ports) = caller_addr
print('starting new media')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
media_defer.addCallback(wait3, protocol, session)
return media_defer
def wait3(result, protocol, session):
print('got new media, waiting 5 seconds')
defer = Deferred()
defer.addCallback(kthxbye, protocol, session)
reactor.callLater(5, defer.callback, None)
return defer
def kthxbye(result, protocol, session):
print('sending remove')
return session.do_remove(protocol, 'caller')
def disconnect(result, connector):
print('disconnecting')
connector.disconnect()
reactor.callLater(1, reactor.stop)
def catch_all_err(failure):
print(failure)
if __name__ == '__main__':
caller = Endpoint('Alice ', 'Caller UA', True)
caller_media = caller.set_media([('audio', 40000, 'sendrecv', {})])
callee = Endpoint('Bob ', 'Callee UA', False)
callee_media = callee.set_media([('audio', 30000, 'sendrecv', {})])
session = Session(caller, callee)
connector, defer = connect_to_dispatcher()
defer.addCallback(caller_update, session, caller_media, callee_media)
defer.addCallback(disconnect, connector)
defer.addErrback(catch_all_err)
reactor.run()
diff --git a/test/updatetest6.py b/test/updatetest6.py
index 8ab6efe..9dff1c9 100755
--- a/test/updatetest6.py
+++ b/test/updatetest6.py
@@ -1,160 +1,166 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
# Copyright (C) 2008 AG Projects
#
"""
This test simulates a session with audio and video media flowing, after which
the callee removes the video stream and only audio flows. After a while, the
video stream is introduced back and both audio and video flow for a while:
- caller sends INVITE, callee sends 200 ok
- audio and video media flows for 15 seconds
- callee proposes to keep only the audio stream using a re-INVITE, caller
sends OK
- audio media flows for 15 seconds
- callee proposes to reintroduce a video stream using a re-INVITE, caller
sends OK
- audio and video media flows for 15 seconds
- caller sends BYE
"""
from common import *
def caller_update(protocol, session, caller_media, callee_media):
print('doing update for caller')
defer = session.do_update(protocol, 'caller', 'request', False)
defer.addCallback(callee_update, protocol, session, caller_media, callee_media)
return defer
def callee_update(callee_addr, protocol, session, caller_media, callee_media):
print('doing update for callee')
defer = session.do_update(protocol, 'callee', 'reply', True)
defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media)
return defer
-def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media):
+def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media):
+ (caller_ip, caller_ports) = caller_addr
+ (callee_ip, callee_ports) = callee_addr
print('starting media for both parties')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
defer = DeferredList([caller_media, callee_media])
defer.addCallback(wait, protocol, session)
return defer
def wait(result, protocol, session):
print('got media, waiting 15 seconds')
defer = Deferred()
defer.addCallback(stop_media, protocol, session)
reactor.callLater(15, defer.callback, None)
return defer
def stop_media(result, protocol, session):
print('stopping media')
defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()])
defer.addCallback(change_callee, protocol, session)
return defer
def change_callee(result, protocol, session):
print('sending new update for callee')
caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {}), ('video', 0, 'sendrecv', {})])
callee_media = session.callee.set_media([('audio', 30000, 'sendrecv', {}), ('video', 0, 'sendrecv', {})])
media_defer = DeferredList([caller_media, callee_media])
defer = session.do_update(protocol, 'callee', 'request', False)
defer.addCallback(change_caller, protocol, session, media_defer)
return defer
-def change_caller((caller_ip, caller_ports), protocol, session, media_defer):
+def change_caller(caller_addr, protocol, session, media_defer):
+ (caller_ip, caller_ports) = caller_addr
print('sending new update for caller')
defer = session.do_update(protocol, 'caller', 'reply', True)
defer.addCallback(start_new_media, protocol, session, media_defer, caller_ip, caller_ports)
return defer
-def start_new_media((callee_ip, callee_ports), protocol, session, media_defer, caller_ip, caller_ports):
+def start_new_media(callee_addr, protocol, session, media_defer, caller_ip, caller_ports):
+ (callee_ip, callee_ports) = calee_addr
print('starting new media')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
media_defer.addCallback(wait2, protocol, session)
return media_defer
def wait2(result, protocol, session):
print('got new media, waiting 15 seconds')
defer = Deferred()
defer.addCallback(stop_media2, protocol, session)
reactor.callLater(15, defer.callback, None)
return defer
def stop_media2(result, protocol, session):
print('stopping media')
defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()])
defer.addCallback(change_callee2, protocol, session)
return defer
def change_callee2(result, protocol, session):
print('sending new update for callee')
caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {}), ('video', 40010, 'sendrecv', {})])
callee_media = session.callee.set_media([('audio', 30000, 'sendrecv', {}), ('video', 30010, 'sendrecv', {})])
media_defer = DeferredList([caller_media, callee_media])
defer = session.do_update(protocol, 'callee', 'request', False)
defer.addCallback(change_caller2, protocol, session, media_defer)
return defer
-def change_caller2((caller_ip, caller_ports), protocol, session, media_defer):
+def change_caller2(caller_addr, protocol, session, media_defer):
+ (caller_ip, caller_ports) = caller_addr
print('sending new update for caller')
defer = session.do_update(protocol, 'caller', 'reply', True)
defer.addCallback(start_new_media2, protocol, session, media_defer, caller_ip, caller_ports)
return defer
-def start_new_media2((callee_ip, callee_ports), protocol, session, media_defer, caller_ip, caller_ports):
+def start_new_media2(callee_addr, protocol, session, media_defer, caller_ip, caller_ports):
+ (callee_ip, callee_ports) = callee_addr
print('starting new media')
session.caller.start_media(caller_ip, caller_ports)
session.callee.start_media(callee_ip, callee_ports)
media_defer.addCallback(wait3, protocol, session)
return media_defer
def wait3(result, protocol, session):
print('got new media, waiting 15 seconds')
defer = Deferred()
defer.addCallback(kthxbye, protocol, session)
reactor.callLater(15, defer.callback, None)
return defer
def kthxbye(result, protocol, session):
print('sending remove')
return session.do_remove(protocol, 'caller')
def disconnect(result, connector):
print('disconnecting')
connector.disconnect()
reactor.callLater(1, reactor.stop)
def catch_all_err(failure):
print(failure)
if __name__ == '__main__':
caller = Endpoint('Alice ', 'Caller UA', True)
caller_media = caller.set_media([('audio', 40000, 'sendrecv', {}), ('video', 40010, 'sendrecv', {})])
callee = Endpoint('Bob ', 'Callee UA', False)
callee_media = callee.set_media([('audio', 30000, 'sendrecv', {}), ('video', 30010, 'sendrecv', {})])
session = Session(caller, callee)
connector, defer = connect_to_dispatcher()
defer.addCallback(caller_update, session, caller_media, callee_media)
defer.addCallback(disconnect, connector)
defer.addErrback(catch_all_err)
reactor.run()