Page MenuHomePhabricator

No OneTemporary

diff --git a/README b/README
index 6596de1..a02f0e2 100644
--- a/README
+++ b/README
@@ -1,348 +1,378 @@
MediaProxy
----------
Authors: Ruud Klaver, Dan Pascu, Saul Ibarra
Home page: http://mediaproxy.ag-projects.com
License
-------
This software is licensed according to the GNU General Public License
version 2. See LICENSE file for more details.
For other licensing options please contact sales-request@ag-projects.com
Description
-----------
MediaProxy is a media relay for RTP/RTCP and UDP streams that works in
tandem with OpenSIPS to provide NAT traversal capability for media streams
from SIP user agents located behind NAT.
When using MediaProxy, NAT traversal for RTP media will work without any
settings in the SIP User Agents or the NAT router.
Features
--------
- Scalability of thousands of calls per server limited only by the Linux
kernel networking layer and network interface bandwidth
- Supports multiple chained relays as long as each has a public IP
- TLS encryption between the relays and dispatcher
- T.38 fax support
- Graceful shutdown capability
- Automatic load balancing and redundancy among all media relays
- Real-time sessions statistics
- Configurable IP and UDP port range
- Support for any combination of audio and video streams
- Ability to use OpenSIPS' MI interface to close a call that did timeout
- Radius accounting of IP network traffic
- Database accounting of complete media information including all streams,
their type, codecs and duration.
- Supports ICE negotiation by behaving like a TURN relay candiate
+ - Supports routing media between multiple interfaces
Background
-----------
MediaProxy 2.0 is the second generation media relay application which is
based on a completely new design that allows for major improvements in areas
such as scalability (an order of magnitude more scalable than previous
version) and security (communication between relay and dispatcher is
encrypted).
New features have been added to support call flows related to user mobility
and fax transmission.
Architecture
------------
MediaProxy consists of a dispatcher and one or more media relays.
The dispatcher component always runs on the same host as OpenSIPS and
communicates with its mediaproxy module through a UNIX domain socket. The
relay(s) connect to the dispatcher using TLS. This relay component may be on
the same or on a different host as OpenSIPS. There may be several relays for
the dispatcher to choose from and a relay may service more than one
dispatcher.
When OpenSIPS requests that a call be relayed, the dispatcher will forward
this request to one of its connected relays, along with some data from the
SDP. The relay will allocate a set of UDP ports for this session, depending
on the number of proposed streams. It will inform the dispatcher which ports
it has allocated so that it may in turn notify the mediaproxy module of
OpenSIPS, which will replace the relevant parts of the SDP.
The same is done for any SIP messages from the callee, thus all the media
streams will be sent through the relay. When the session between caller and
callee has finished, either through a SIP BYE or because the media is no
longer flowing and has timed out, the relay will send session information to
the dispatcher, which can store this information using one or more accounting
modules.
The session information may also be queried using a management interface on
the dispatcher.
All of this is illustrated in the following diagram:
+---+ +---+
| | +---------------------+ | |
| | | SIP Proxy | | |
| | | +----------+ | SIP | |
| |<--+->| OpenSIPS |<------+------------------->| |
| | | +----------+ | | |
| | | ^ | | |
| | | | UNIX socket | | |
| C | | v | | C |
| A | | +------------+ | +------------+ | A |
| L | | | Dispatcher |<-----+-->| Management | | L |
| L | | +------------+ TCP | | client | | L |
| E | | ^ /TLS | +------------+ | E |
| R | | | | | E |
| | +---------+-----------+ | |
| | | | |
| | | TLS | |
| | v | |
| | +-------------+ UDP | |
| |<---->| Relay |<----------------------->| |
| | +-------------+ RTP / RTCP | |
+---+ +---+
Please note that the accounting modules are not shown.
Compatibility and pre-requisites
--------------------------------
Both OpenSIPS and MediaProxy must use a public IP address.
To run the software, you will need a server running the Linux Operating
System using a kernel version 2.6.18 or higher that has been compiled with
connection tracking support (conntrack). IPtables 1.4.3 or higher is also
required. Because of this dependency on Linux, other operating systems are
not supported. This dependency only applies to the media relay component.
The dispatcher component which runs on the same host as OpenSIPS, can run
on any platform that has a python interpreter and supports the twisted
framework.
Communication between the dispatcher and the relays uses TLS encryption and
requires a set of X509 certificates to work. For more information about this
please read tls/README which contains information about the sample certificates
that are included as well as information about how to generate your own.
MediaProxy is meant to be used together with OpenSIPS' mediaproxy module.
This version of MediaProxy (2.0 or higher) cannot be used in combination
with any version of OpenSIPS older than 1.4 or any components of MediaProxy
older than 2.0. You must completely upgrade any previous installation of
OpenSER to OpenSIPS to use this version of MediaProxy.
No STUN or TURN support are required in the clients.
The SIP User Agents must work symmetrically (that is to send and receive
data on the same port for each stream), which is documented in RFC 4961.
To display the history of the media streams CDRTool 6.5 or higher is
required.
Some features that were present in the previous version have been removed:
- Support for specifying media relays per domain has been discontinued
- Support for DNS records has been discontinued
- Support for asymmetric clients has been discontinued
- Support for other operating systems than Linux has been discontinued
(only for the media relay, as the dispatcher has no such limitation)
For information of how to install MediaProxy, please consult the INSTALL
file.
Important note
--------------
For Linux kernels >= 4.9 and < 5.1 you must add a rule to trigger the
connection tracking:
sudo iptables -I INPUT -m state --state NEW
Starting with kernel 5.1 you can enable enable_hooks parameter:
modprobe nf_conntrack enable_hooks=1
or use the iptables rule above.
For more information about this requirement see:
https://github.com/torvalds/linux/commit/ba3fbe663635ae7b33a2d972c5d2def036258e42
Operation
---------
Before the relay is run, please make sure that /proc/sys/net/ipv4/ip_forward
is set to "1". Also for newer kernels ACCT on connection tracking needs to
be enabled. Therefore /proc/sys/net/netfilter/nf_conntrack_acct must be set to
"1".
Both the dispatcher and the relay should be executed with root privileges.
With no arguments, both applications will automatically fork into the
background and log to syslog. They can remain in the foreground and log to
console when given the --no-fork argument.
The relay can be shut down in two ways. When receiving either an INT or TERM
signal, the relay will terminate all of its sessions immediately and inform
the dispatcher that those sessions have expired. When given the HUP signal,
it will not accept any new sessions from the dispatcher and wait for all of
the running sessions to expire, thus terminating gracefully.
At the very least a set of TLS credentials is required. Sample certificates
for this are included in the tls/ subdirectory.
DO NOT USE THESE IN A PRODUCTION ENVIRONMENT, but only for testing purposes.
For more information about TLS certificates and how to generate your own,
check the tls/README file.
Accounting
----------
MediaProxy is capable to do additional per call accounting with information
related to the media streams used by the call. MediaProxy has a modular
interface to the accounting system, allowing for new modules to be easily
implemented. Currently it supports database and radius backends. Multiple
backends can be configured and used simultaneously.
Radius accounting
-----------------
The radius backend logs very basic information about the media streams. The
limited nature of the logged information is mainly given by the limitations
imposed by the radius protocol to the data size.
The information sent in the radius packet is shown below:
Acct-Status-Type = "Update"
User-Name = "mediaproxy@default"
Acct-Session-Id = call_id
Sip-From-Tag = from_tag
Sip-To-Tag = to_tag
Acct-Session-Time = call duration
Acct-Input-Octets = bytes received from caller
Acct-Output-Octets = bytes received from callee
NAS-IP-Address = media-relay address
Sip-User-Agents = caller + callee user agents
Sip-Applications = "Audio", "Video", ...
Media-Codecs = codecs used by streams (comma separated)
Media-Info = "timeout" or ""
Acct-Delay-Time = post dial delay (seconds from INVITE to 1st media packet)
Database accounting
-------------------
The database backend logs all the information related to the media streams that
were created/closed during the whole session. This information is stored as a
JSON encoded string in a BLOB column in the database, along with the call_id,
from_tag and to_tag columns that can be used to retrieve the media information
for a given call. The database table and column names are fully configurable
in the database section of the configuration file.
The table used to store these records, is automatically created by the media
dispatcher on startup, if it's not present. For this to happen, the user that
is configured in the dburi option in the database section, must have the CREATE
and ALTER rights on the database specified in the same dburi. If this is not
possible, then the media dispatcher will log an error indicating why it could
not create the table and also output the table definition that can be used by
some human operator to manually create the table. However, the recommended
way is to grant the CREATE and ALTER privileges to the user in the dburi over
the database specified in the same dburi.
The database module uses SQLObject to access the database, which means it can
work with a lot of databases, by simply changing the scheme in the dburi.
Currently the following databases are supported: mysql, postgres, sqlite,
firebird, maxdb, mssql and sybase.
Closing expired calls
---------------------
Starting with version 2.1.0, MediaProxy supports closing calls for which all
the media streams did timeout, but for which no BYE was received to close the
call in the standard way.
This feature will only work, when the OpenSIPS mediaproxy module uses the
engage_media_proxy() command to start MediaProxy for a given call. In this
case the mediaproxy module uses the dialog module to keep track of the call
and can pass the dialog id to the media dispatcher. When a media session is
expired because all streams did timeout, but no closing request was received
from the proxy, the media dispatcher will use the dialog id that was received
from the mediaproxy module, to issue a dlg_end_dlg request into the OpenSIPS'
MI interface, instructing OpenSIPS to generate the BYEs for the call, closing
it in a clean way and generating the accounting records.
To use this, the mi_datagram module must be loaded and configured to use a
UNIX filesystem socket which must also be configured into the OpenSIPS section
of the MediaProxy configuration as socket_path.
This feature is not available when using the use_media_proxy/end_media_session
functions in the proxy configuration, because in that case there is no dialog
that is tracked by the proxy which could be terminated using dlg_end_dlg.
+Multiple interfaces
+-------------------
+
+When using MediaProxy, the default IP address of the relay machine will appear
+in the c line of the SDP proposed to each party.
+
+On systems with multiple network interfaces, this IP address can be
+automatically set with the IP addresss that coresponds to the interface that
+has a route for the IP adress of each side of the call.
+
+In order to decide which network interface should be used, the mp_signaling_ip
+avp in OpenSIPS configuration should be set as follows:
+
+$avp(mp_signaling_ip) = sourceIP_destinationIP
+
+The sourceIP is the IP address where the SIP INVITE originated from. The
+destinationIP is the IP address where the SIP INVITE will be sent to.
+
+If destinationIP is not known, $avp(mp_signaling_ip) can be set only to
+sourceIP. Otherwise, if the avp is not set, the source IP address of the
+original SIP INVITE packet will be used.
+
+This behaviour can be enabled my setting auto_detect_interfaces to True in the
+relay configuration.
+
+The IP address can also be always overwritten by configuring advertised_ip in
+the relay configuration. If so, auto_detect_interfaces setting has no effect.
+
+
Gracefull shutdown
------------------
To tell media-relay component to gracefully shutdown when using systemd:
sudo systemctl reload mediaproxy-relay
The reload command will send the HUP signal to the PID of the relay
component and the software will shutdown when the last relayed call has
ended.
Management interface
--------------------
The management interface will accept commands terminated by \r\n. It will
return the results of the command, one per line, terminated by an empty
line (also \r\n terminated).
Currently two commands are supported:
sessions : This will have the dispatcher query all of its connected relays
for active sessions. For every sessions it finds it will return
one line with a JSON encoded dictionary containing session
information.
summary : This will have the dispatcher present a summary of each of its
connected relays. The results are returned as a JSON encoded
dictionary, one line per relay.
Free support
------------
MediaProxy is developed and supported by AG Projects. AG Projects offers
best-effort free support for MediaProxy. "best-effort" means that we try to
solve the bugs you report or help fix your problems as soon as we can,
subject to available resources.
You may report bugs or feature request to:
users@lists.opensips.org
A mailing list archive is available at:
http://lists.opensips.org/cgi-bin/mailman/listinfo/users
Commercial support
------------------
Visit http://ag-projects.com
diff --git a/config.ini.sample b/config.ini.sample
index 3913f32..538cf73 100644
--- a/config.ini.sample
+++ b/config.ini.sample
@@ -1,234 +1,240 @@
[Relay]
; A list of dispatchers to connect to, separated by spaces. The format is
; "host[:port] [host[:port] ...]". If a port is not specified the default port
; of 25060 will be used. "host" can be one of the following:
; - A domain name that has a SRV record for a SIP proxy, i.e. at
; "_sip._udp.<domain>". If the DNS lookup for this succeeds the relay
; will connect to the IP address of the SIP proxy on the port specified in
; this configuration.
; - A hostname. The lookup for this will be performed if the SRV lookup
; fails.
; - An IP address. The relay will connect directly to this address.
; Both the SRV and hostname lookups will be periodically refreshed (see
; "dns_check_interval" below).
;
;dispatchers = example.com 1.2.3.4:12345
; Specify extra checks to be performed on the dispatcher TLS credentials before
; considering the connection with the dispatcher successful. The passport is
; specified as a list of attribute/value pairs in the form:
; AN:value[, AN:value...]
; where the attribute name (AN) is one of the available attribute names from
; the X509 certificate subject: O, OU, CN, C, L, ST, EMAIL. The value is a
; string that has to match with the corresponding attribute value from the
; dispatcher certificate. A wildcard (*) can be used in the value at the
; beginning or the end of the string to indicate that the corresponding
; attribute from the dispatcher certificate must end with respectively to
; start with the given string (excluding the wildcard).
; For example using this passport:
; passport = O:AG Projects, CN:*dispatcher
; means that a connection with a dispatcher will only be accepted if the
; dispatcher certificate subject has organization set to "AG Projects" and
; the common name ends with "dispatcher". To specify that no additional
; identity checks need to be performed, use the keyword None. If passport
; is None, then only the certificate signature is verified against the
; certificate authority in tls/ca.pem (signature is always verified even
; when passport is None).
;
; Default value is None.
;
;passport = None
; The host IP address used for relaying streams. The default for this value
; is to use the IP address of the interface that has the default route. This
; is the most appropriate choice for almost any situation. Unless you need to
; use a very specific interface, which is not the default one, there is no need
; to set this option. Leave this option commented to use the default value.
;relay_ip = <default host IP>
+; The IP address of the relay can be replaced with the IP address of the
+; interface coreponding to the sourceIP and destinationIP of the call.
+; if set to False the relay_ip will be used instead
+;auto_detect_interfaces = False
+
; The host IP address to return when a session is allocated in the relay. This
; could be of use in case the relay is behind NAT but it has a 1 to 1 mapping
; with a public IP address, like Amazon EC2, for example.
+; If set, auto_detect_interfaces setting will be ignored.
;advertised_ip =
; The port range to use for relaying media streams in the form start:end with
; start and end being even numbers in the [1024, 65536] range and start < end
; The default range is 50000:60000. You should allocate 4 times the number of
; streams you plan for the relay to handle simultaneously. The default range
; having 10000 ports, is able to handle up to 2500 streams.
;
;port_range = 50000:60000
; Logging level (one of CRITICAL, ERROR, WARNING, INFO or DEBUG)
;log_level = INFO
; The amount of time to wait for a stream in a new SDP offer to start sending
; data before the relay decides that it has timed out. The default value is 90
; seconds. This only applies to the initial setup stage, before the first
; packet for a stream is received (from both ends). After the stream is started
; and the conntrack rule is in place, the idle timeout (how long before the
; conntrack rule expires when no traffic is received) is controlled by a kernel
; setting that defaults to 180 seconds and can be adjusted in:
; /proc/sys/net/ipv4/netfilter/ip_conntrack_udp_timeout_stream
;
;stream_timeout = 90
; Amount of time a call can be on hold before it is declared expired by the
; relay. The default value is 7200 seconds (2 hours).
;
;on_hold_timeout = 7200
; How often to check in DNS if the SRV and A records for the dispatcher have
; changed. Interval is in seconds and the default value is 60 seconds.
;
;dns_check_interval = 60
; If the relay cannot connect to a dispatcher is should retry after this
; amount of seconds. The default value is 10 seconds.
;
;reconnect_delay = 10
; How often to sample the aggregate amount of data processed by the relay, in
; order to compute an average of the relayed traffic over that period. The
; value is expressed in seconds and the default value is 15 seconds.
; Use 0 to disable it in case you have to many streams processed by the relay
; and it warns you in syslog that gathering this information takes too long.
;
;traffic_sampling_period = 15
; Specify a list of network ranges (in CIDR notation) for which media is relayed
; even if no packet was received from the endpoint and the IP address is private.
;routable_private_ranges = 192.168.1.0/24
[Dispatcher]
; Local socket on which to communicate with OpenSIPS. The OpenSIPS mediaproxy
; module should be configured to connect to this socket. If a relative path,
; the runtime directory will be prepended. Default value is dispatcher.sock.
;
;socket_path = dispatcher.sock
; Listen address for incoming connections from the relays. The format is
; "ip[:port]". If the ip is "0.0.0.0" or the keyword "any", the dispatcher
; will listen on all interfaces of this host. If the port is not specified,
; the dispatcher will listen on the default port of 25060.
;
;listen = 0.0.0.0
; Listen address for incoming management interface connections. Clients can
; connect to this and issue commands to query the status of the relays and
; their sessions. The format is "ip[:port]". If the ip is "0.0.0.0" or the
; keyword "any", the dispatcher will listen on all interfaces of this host.
; If the port is not specified, the dispatcher will listen on the default
; port of 25061.
;
;listen_management = 0.0.0.0
; Whether or not to use TLS on the management interface. Note that the same
; TLS credentials are used for both the relay and the management interface
; connections.
;
; Default value is yes.
;
;management_use_tls = yes
; Specify extra checks to be performed on the relay TLS credentials before
; considering the connection with the relay successful. The passport is
; specified as a list of attribute/value pairs in the form:
; AN:value[, AN:value...]
; where the attribute name (AN) is one of the available attribute names from
; the X509 certificate subject: O, OU, CN, C, L, ST, EMAIL. The value is a
; string that has to match with the corresponding attribute value from the
; relay certificate. A wildcard (*) can be used in the value at the beginning
; or the end of the string to indicate that the corresponding attribute from
; the relay certificate must end with respectively to start with the given
; string (excluding the wildcard).
; For example using this passport:
; passport = O:AG Projects, CN:relay*
; means that a connection with a relay will only be accepted if the relay
; certificate subject has organization set to "AG Projects" and the common
; name starts with "relay". To specify that no additional identity checks
; need to be performed, use the keyword None. If passport is None, then only
; the certificate signature is verified against the certificate authority in
; tls/ca.pem (signature is always verified even when passport is None).
;
; Default value is None.
;
;passport = None
; This option is similar to passport above, but applies to the management
; interface connections instead of relay connections. It specifies extra
; checks to be performed on the TLS credentials supplied by an entity that
; connects to the management interface. Please consult passport above for
; a detailed description of the possible values for this option.
;
; If management_use_tls is false, this option is ignored.
;
; Default value is None.
;
;management_passport = None
; Logging level (one of CRITICAL, ERROR, WARNING, INFO or DEBUG)
;log_level = INFO
; Timeout value in second for individual relays. When a command is sent from
; the dispatcher to a relay it will wait this amount of seconds for a reply.
; The default is 5 seconds.
;
;relay_timeout = 5
; A comma separated list of accounting backends that will be used to save
; accounting data with the session information once a session has finished.
; Currently 2 backends are available: "radius" and "database". If enabled
; they can be configured below in their respective sections. The default
; is to use no accounting backend.
;
;accounting =
[TLS]
; Path to the certificates. If relative, it will be looked up in both the
; application directory (for a standalone installation) and /etc/mediaproxy,
; the former taking precedence if found.
;
;certs_path = tls
; How often (in seconds) to verify the peer certificate for expiration and
; revocation. Default value is 300 seconds (5 minutes)
;
;verify_interval = 300
[Database]
; This section needs to be configured if database accounting is enabled
; Database URI in the form: scheme://user:password@host/database
;dburi = mysql://mediaproxy:CHANGEME@localhost/mediaproxy
; Name for the table.
;sessions_table = media_sessions
; Column names. Columns are strings except for info which is a BLOB
;
;callid_column = call_id
;fromtag_column = from_tag
;totag_column = to_tag
;info_column = info
[Radius]
; This section needs to be configured if radius accounting is enabled
; OpenSIPS RADIUS configuration file. All RADIUS configuration parameters
; will be read from this file, including dictionary files.
;
;config_file = /etc/opensips/radius/client.conf
; Additional dictionary file with MediaProxy specific attributes.
;additional_dictionary = radius/dictionary
[OpenSIPS]
; Configure interaction between the media dispatcher and OpenSIPS
; Path to OpenSIPS's UNIX filesystem socket from the mi_datagram module.
;socket_path = /run/opensips/socket
diff --git a/mediaproxy/configuration/__init__.py b/mediaproxy/configuration/__init__.py
index f7599fb..60a3ba6 100644
--- a/mediaproxy/configuration/__init__.py
+++ b/mediaproxy/configuration/__init__.py
@@ -1,90 +1,91 @@
from application import log
from application.configuration import ConfigSection, ConfigSetting
from application.configuration.datatypes import IPAddress, LogLevel, NetworkRangeList
from application.system import host
from mediaproxy import configuration_file
from mediaproxy.configuration.datatypes import AccountingModuleList, DispatcherIPAddress, DispatcherAddressList, DispatcherManagementAddress, PortRange, PositiveInteger, SIPThorDomain, X509NameValidator
class DispatcherConfig(ConfigSection):
__cfgfile__ = configuration_file
__section__ = 'Dispatcher'
socket_path = 'dispatcher.sock'
listen = ConfigSetting(type=DispatcherIPAddress, value=DispatcherIPAddress('any'))
listen_management = ConfigSetting(type=DispatcherManagementAddress, value=DispatcherManagementAddress('any'))
relay_timeout = 5 # How much to wait for an answer from a relay
relay_recover_interval = 60 # How much to wait for an unresponsive relay to recover, before disconnecting it
cleanup_dead_relays_after = 43200 # 12 hours
cleanup_expired_sessions_after = 86400 # 24 hours
management_use_tls = True
accounting = ConfigSetting(type=AccountingModuleList, value=[])
passport = ConfigSetting(type=X509NameValidator, value=None)
management_passport = ConfigSetting(type=X509NameValidator, value=None)
log_level = ConfigSetting(type=LogLevel, value=log.level.INFO)
class RelayConfig(ConfigSection):
__cfgfile__ = configuration_file
__section__ = 'Relay'
relay_ip = ConfigSetting(type=IPAddress, value=host.default_ip)
advertised_ip = ConfigSetting(type=IPAddress, value=None)
+ auto_detect_interfaces = False
stream_timeout = 90
on_hold_timeout = 7200
traffic_sampling_period = 15
userspace_transmit_every = 1
dispatchers = ConfigSetting(type=DispatcherAddressList, value=[])
port_range = PortRange('50000:60000')
dns_check_interval = PositiveInteger(60)
keepalive_interval = PositiveInteger(10)
reconnect_delay = PositiveInteger(10)
passport = ConfigSetting(type=X509NameValidator, value=None)
routable_private_ranges = ConfigSetting(type=NetworkRangeList, value=[])
log_level = ConfigSetting(type=LogLevel, value=log.level.INFO)
class OpenSIPSConfig(ConfigSection):
__cfgfile__ = configuration_file
__section__ = 'OpenSIPS'
socket_path = '/run/opensips/socket'
location_table = 'location'
class RadiusConfig(ConfigSection):
__cfgfile__ = configuration_file
__section__ = 'Radius'
config_file = '/etc/opensips/radius/client.conf'
additional_dictionary = 'radius/dictionary'
class DatabaseConfig(ConfigSection):
__cfgfile__ = configuration_file
__section__ = 'Database'
dburi = ''
sessions_table = 'media_sessions'
callid_column = 'call_id'
fromtag_column = 'from_tag'
totag_column = 'to_tag'
info_column = 'info'
class TLSConfig(ConfigSection):
__cfgfile__ = configuration_file
__section__ = 'TLS'
certs_path = 'tls'
verify_interval = 300
class ThorNetworkConfig(ConfigSection):
__cfgfile__ = configuration_file
__section__ = 'ThorNetwork'
domain = ConfigSetting(type=SIPThorDomain, value=None)
node_ip = host.default_ip
diff --git a/mediaproxy/mediacontrol.py b/mediaproxy/mediacontrol.py
index 3d7d0f7..c218b22 100644
--- a/mediaproxy/mediacontrol.py
+++ b/mediaproxy/mediacontrol.py
@@ -1,839 +1,856 @@
import hashlib
import struct
from application import log
+from application.system import host
from base64 import b64encode as base64_encode
from itertools import chain
from collections import deque
from operator import attrgetter
from time import time
from twisted.internet import reactor
from twisted.internet.interfaces import IReadDescriptor
from twisted.internet.protocol import DatagramProtocol
from twisted.internet.error import CannotListenError
from twisted.python.log import Logger
from zope.interface import implementer
from mediaproxy.configuration import RelayConfig
from mediaproxy.interfaces.system import _conntrack
from mediaproxy.iputils import is_routable_ip
from mediaproxy.scheduler import RecurrentCall, KeepRunning
UDP_TIMEOUT_FILE = '/proc/sys/net/netfilter/nf_conntrack_udp_timeout_stream'
rtp_payloads = {
0: 'G711u', 1: '1016', 2: 'G721', 3: 'GSM', 4: 'G723', 5: 'DVI4', 6: 'DVI4',
7: 'LPC', 8: 'G711a', 9: 'G722', 10: 'L16', 11: 'L16', 14: 'MPA', 15: 'G728',
18: 'G729', 25: 'CelB', 26: 'JPEG', 28: 'nv', 31: 'H261', 32: 'MPV', 33: 'MP2T',
34: 'H263'
}
class RelayPortsExhaustedError(Exception):
pass
if RelayConfig.relay_ip is None:
raise RuntimeError('Could not determine default host IP; either add default route or specify relay IP manually')
class SessionLogger(log.ContextualLogger):
def __init__(self, session):
super(SessionLogger, self).__init__(logger=log.get_logger()) # use the main logger as backend
self.session_id = session.call_id
def apply_context(self, message):
return '[session {0.session_id}] {1}'.format(self, message) if message != '' else ''
class Address(object):
"""Representation of an endpoint address"""
def __init__(self, host, port, in_use=True, got_rtp=False):
self.host = host
self.port = port
self.in_use = self.__bool__() and in_use
self.got_rtp = got_rtp
def __len__(self):
return 2
def __bool__(self):
return None not in (self.host, self.port)
def __getitem__(self, index):
return (self.host, self.port)[index]
def __contains__(self, item):
return item in (self.host, self.port)
def __iter__(self):
yield self.host
yield self.port
def __str__(self):
return self.__bool__() and ('%s:%d' % (self.host, self.port)) or 'Unknown'
def __repr__(self):
return '%s(%r, %r, in_use=%r, got_rtp=%r)' % (self.__class__.__name__, self.host, self.port, self.in_use, self.got_rtp)
def forget(self):
self.host, self.port, self.in_use, self.got_rtp = None, None, False, False
@property
def unknown(self):
return None in (self.host, self.port)
@property
def obsolete(self):
return self.__bool__() and not self.in_use
class Counters(dict):
def __add__(self, other):
n = Counters(self)
for k, v in other.items():
n[k] += v
return n
def __iadd__(self, other):
for k, v in other.items():
self[k] += v
return self
@property
def caller_bytes(self):
return self['caller_bytes']
@property
def callee_bytes(self):
return self['callee_bytes']
@property
def caller_packets(self):
return self['caller_packets']
@property
def callee_packets(self):
return self['callee_packets']
@property
def relayed_bytes(self):
return self['caller_bytes'] + self['callee_bytes']
@property
def relayed_packets(self):
return self['caller_packets'] + self['callee_packets']
class StreamListenerProtocol(DatagramProtocol):
noisy = False
def __init__(self):
self.cb_func = None
self.sdp = None
self.send_packet_count = 0
self.stun_queue = []
def datagramReceived(self, data, addr):
(host, port) = addr
if self.cb_func is not None:
self.cb_func(host, port, data)
def set_remote_sdp(self, ip, port):
if is_routable_ip(ip):
self.sdp = ip, port
else:
self.sdp = None
def send(self, data, is_stun, ip=None, port=None):
if is_stun:
self.stun_queue.append(data)
if ip is None or port is None:
# this means that we have not received any packets from this host yet,
# so we have not learnt its address
if self.sdp is None:
# we can't do anything if we haven't received the SDP IP yet or
# it was in a private range
return
ip, port = self.sdp
# we learnt the IP, empty the STUN packets queue
if self.stun_queue:
for data in self.stun_queue:
self.transport.write(data, (ip, port))
self.stun_queue = []
if not is_stun:
if not self.send_packet_count % RelayConfig.userspace_transmit_every:
self.transport.write(data, (ip, port))
self.send_packet_count += 1
def _stun_test(data):
# Check if data is a STUN request and if it's a binding request
if len(data) < 20:
return False, False
msg_type, msg_len, magic = struct.unpack('!HHI', data[:8])
if msg_type & 0xc == 0 and magic == 0x2112A442:
if msg_type == 0x0001:
return True, True
else:
return True, False
else:
return False, False
class MediaSubParty(object):
def __init__(self, substream, listener):
self.substream = substream
self.logger = substream.logger
self.listener = listener
self.listener.protocol.cb_func = self.got_data
self.remote = Address(None, None)
host = self.listener.protocol.transport.getHost()
self.local = Address(host.host, host.port)
self.timer = None
self.codec = 'Unknown'
self.got_stun_probing = False
self.reset()
def reset(self):
if self.timer and self.timer.active():
self.timer.cancel()
self.timer = reactor.callLater(RelayConfig.stream_timeout, self.substream.expired, 'no-traffic timeout', RelayConfig.stream_timeout)
self.remote.in_use = False # keep remote address around but mark it as obsolete
self.remote.got_rtp = False
self.got_stun_probing = False
self.listener.protocol.send_packet_count = 0
def before_hold(self):
if self.timer and self.timer.active():
self.timer.cancel()
self.timer = reactor.callLater(RelayConfig.on_hold_timeout, self.substream.expired, 'on hold timeout', RelayConfig.on_hold_timeout)
def after_hold(self):
if self.timer and self.timer.active():
self.timer.cancel()
if not self.remote.in_use:
self.timer = reactor.callLater(RelayConfig.stream_timeout, self.substream.expired, 'no-traffic timeout', RelayConfig.stream_timeout)
def got_data(self, host, port, data):
if (host, port) == tuple(self.remote):
if self.remote.obsolete:
# the received packet matches the previously used IP/port,
# which has been made obsolete, so ignore it
return
else:
if self.remote.in_use:
# the received packet is different than the recorded IP/port,
# so we will discard it
return
# we have learnt the remote IP/port
self.remote.host, self.remote.port = host, port
self.remote.in_use = True
self.logger.info('discovered peer: %s' % self.substream.stream)
is_stun, is_binding_request = _stun_test(data)
self.substream.send_data(self, data, is_stun)
if not self.remote.got_rtp and not is_stun:
# This is the first RTP packet received
self.remote.got_rtp = True
if self.timer:
if self.timer.active():
self.timer.cancel()
self.timer = None
if self.codec == 'Unknown' and self.substream is self.substream.stream.rtp:
try:
pt = data[1] & 127
except IndexError:
pass
else:
if pt > 95:
self.codec = 'Dynamic(%d)' % pt
elif pt in rtp_payloads:
self.codec = rtp_payloads[pt]
else:
self.codec = 'Unknown(%d)' % pt
self.substream.check_create_conntrack()
if is_binding_request:
self.got_stun_probing = True
def cleanup(self):
if self.timer and self.timer.active():
self.timer.cancel()
self.timer = None
self.listener.protocol.cb_func = None
self.substream = None
class MediaSubStream(object):
def __init__(self, stream, listener_caller, listener_callee):
self.stream = stream
self.logger = stream.logger
self.forwarding_rule = None
self.caller = MediaSubParty(self, listener_caller)
self.callee = MediaSubParty(self, listener_callee)
self._counters = Counters(caller_bytes=0, callee_bytes=0, caller_packets=0, callee_packets=0)
@property
def counters(self):
"""Accumulated counters from all the forwarding rules the stream had"""
if self.forwarding_rule is None:
return self._counters
else:
try:
self.logger.debug(', '.join([f"{key}={self.forwarding_rule.counters[key]}" for key in self.forwarding_rule.counters.keys()]))
return self._counters + self.forwarding_rule.counters
except _conntrack.Error:
return self._counters
def _stop_relaying(self):
if self.forwarding_rule is not None:
try:
self.logger.info(', '.join([f"{key}={self.forwarding_rule.counters[key]}" for key in self.forwarding_rule.counters.keys()]))
self._counters += self.forwarding_rule.counters
except _conntrack.Error:
pass
self.forwarding_rule = None
def reset(self, party):
if party == 'caller':
self.caller.reset()
else:
self.callee.reset()
self._stop_relaying()
def check_create_conntrack(self):
if self.stream.first_media_time is None:
self.stream.first_media_time = time()
if self.caller.remote.in_use and self.caller.remote.got_rtp and self.callee.remote.in_use and self.callee.remote.got_rtp:
self.forwarding_rule = _conntrack.ForwardingRule(self.caller.remote, self.caller.local, self.callee.remote, self.callee.local, self.stream.session.mark)
self.forwarding_rule.expired_func = self.conntrack_expired
def send_data(self, source, data, is_stun):
if source is self.caller:
dest = self.callee
else:
dest = self.caller
if dest.remote:
# if we have already learnt the remote address of the destination, use that
ip, port = dest.remote.host, dest.remote.port
dest.listener.protocol.send(data, is_stun, ip, port)
else:
# otherwise use the IP/port specified in the SDP, if public
dest.listener.protocol.send(data, is_stun)
def conntrack_expired(self):
try:
timeout_wait = int(open(UDP_TIMEOUT_FILE).read())
except:
timeout_wait = 0
self.expired('conntrack timeout', timeout_wait)
def expired(self, reason, timeout_wait):
self._stop_relaying()
self.stream.substream_expired(self, reason, timeout_wait)
def cleanup(self):
self.caller.cleanup()
self.callee.cleanup()
self._stop_relaying()
self.stream = None
class MediaParty(object):
- def __init__(self, stream):
+ def __init__(self, stream, party):
self.manager = stream.session.manager
self.logger = stream.logger
self._remote_sdp = None
self.is_on_hold = False
self.uses_ice = False
while True:
self.listener_rtp = None
self.ports = port_rtp, port_rtcp = self.manager.get_ports()
+ listen_ip = None
+ if RelayConfig.auto_detect_interfaces and not RelayConfig.advertised_ip:
+ if party == 'callee' and stream.session.destination_ip:
+ listen_ip = host.outgoing_ip_for(stream.session.destination_ip)
+ else:
+ listen_ip = host.outgoing_ip_for(stream.session.caller_ip)
+
try:
- self.listener_rtp = reactor.listenUDP(port_rtp, StreamListenerProtocol(), interface=RelayConfig.relay_ip)
- self.listener_rtcp = reactor.listenUDP(port_rtcp, StreamListenerProtocol(), interface=RelayConfig.relay_ip)
+ self.listener_rtp = reactor.listenUDP(port_rtp, StreamListenerProtocol(), interface=listen_ip or RelayConfig.relay_ip)
+ self.listener_rtcp = reactor.listenUDP(port_rtcp, StreamListenerProtocol(), interface=listen_ip or RelayConfig.relay_ip)
except CannotListenError:
if self.listener_rtp is not None:
self.listener_rtp.stopListening()
self.manager.set_bad_ports(self.ports)
self.logger.warning('Cannot use port pair %d/%d' % self.ports)
else:
break
def _get_remote_sdp(self):
return self._remote_sdp
def _set_remote_sdp(self, addr):
(ip, port) = addr
self._remote_sdp = ip, port
self.listener_rtp.protocol.set_remote_sdp(ip, port)
remote_sdp = property(_get_remote_sdp, _set_remote_sdp)
def cleanup(self):
self.listener_rtp.stopListening()
self.listener_rtcp.stopListening()
self.manager.free_ports(self.ports)
self.manager = None
class MediaStream(object):
def __init__(self, session, media_type, media_ip, media_port, direction, media_parameters, initiating_party):
self.is_alive = True
self.session = session # type: Session
self.logger = session.logger
self.media_type = media_type
- self.caller = MediaParty(self)
- self.callee = MediaParty(self)
+ self.caller = MediaParty(self, 'caller')
+ self.callee = MediaParty(self, 'callee')
self.rtp = MediaSubStream(self, self.caller.listener_rtp, self.callee.listener_rtp)
self.rtcp = MediaSubStream(self, self.caller.listener_rtcp, self.callee.listener_rtcp)
getattr(self, initiating_party).remote_sdp = (media_ip, media_port)
getattr(self, initiating_party).uses_ice = (media_parameters.get('ice', 'no') == 'yes')
self.check_hold(initiating_party, direction, media_ip)
self.create_time = time()
self.first_media_time = None
self.start_time = None
self.end_time = None
self.status = 'active'
self.timeout_wait = 0
def __str__(self):
if self.caller.remote_sdp is None:
src = 'Unknown'
else:
src = '%s:%d' % self.caller.remote_sdp
if self.caller.is_on_hold:
src += ' ON HOLD'
if self.caller.uses_ice:
src += ' (ICE)'
if self.callee.remote_sdp is None:
dst = 'Unknown'
else:
dst = '%s:%d' % self.callee.remote_sdp
if self.callee.is_on_hold:
dst += ' ON HOLD'
if self.callee.uses_ice:
dst += ' (ICE)'
rtp = self.rtp
rtcp = self.rtcp
return '(%s) %s (RTP: %s, RTCP: %s) <-> %s <-> %s <-> %s (RTP: %s, RTCP: %s)' % (
self.media_type, src, rtp.caller.remote, rtcp.caller.remote, rtp.caller.local, rtp.callee.local, dst, rtp.callee.remote, rtcp.callee.remote)
@property
def counters(self):
return self.rtp.counters + self.rtcp.counters
@property
def is_on_hold(self):
return self.caller.is_on_hold or self.callee.is_on_hold
def check_hold(self, party, direction, ip):
previous_hold = self.is_on_hold
party = getattr(self, party)
if direction == 'sendonly' or direction == 'inactive':
party.is_on_hold = True
elif ip == '0.0.0.0':
party.is_on_hold = True
else:
party.is_on_hold = False
if previous_hold and not self.is_on_hold:
for substream in [self.rtp, self.rtcp]:
for subparty in [substream.caller, substream.callee]:
self.status = 'active'
subparty.after_hold()
if not previous_hold and self.is_on_hold:
for substream in [self.rtp, self.rtcp]:
for subparty in [substream.caller, substream.callee]:
self.status = 'on hold'
subparty.before_hold()
def reset(self, party, media_ip, media_port):
self.rtp.reset(party)
self.rtcp.reset(party)
getattr(self, party).remote_sdp = (media_ip, media_port)
def substream_expired(self, substream, reason, timeout_wait):
if substream is self.rtp and self.caller.uses_ice and self.callee.uses_ice:
reason = 'unselected ICE candidate'
self.logger.info('RTP stream expired: {}'.format(reason))
if not substream.caller.got_stun_probing and not substream.callee.got_stun_probing:
self.logger.info('unselected ICE candidate, but no STUN was received')
if substream is self.rtcp:
# Forget about the remote addresses, this will cause any
# re-occurrence of the same traffic to be forwarded again
substream.caller.remote.forget()
substream.caller.listener.protocol.send_packet_count = 0
substream.callee.remote.forget()
substream.callee.listener.protocol.send_packet_count = 0
else:
session = self.session
self.cleanup(reason)
self.timeout_wait = timeout_wait
session.stream_expired(self)
def cleanup(self, status='closed'):
if self.is_alive:
self.is_alive = False
self.status = status
self.caller.cleanup()
self.callee.cleanup()
self.rtp.cleanup()
self.rtcp.cleanup()
self.session = None
self.end_time = time()
class Session(object):
- def __init__(self, manager, dispatcher, call_id, from_tag, from_uri, to_tag, to_uri, cseq, user_agent, media_list, is_downstream, is_caller_cseq, mark=0):
+ def __init__(self, manager, dispatcher, call_id, from_tag, from_uri, to_tag, to_uri, cseq, user_agent, media_list, is_downstream, is_caller_cseq, mark=0, caller_ip=None, destination_ip=None):
self.manager = manager
self.dispatcher = dispatcher
self.session_id = base64_encode(hashlib.md5(call_id.encode()).digest()).rstrip(b'=')
self.call_id = call_id
+ self.caller_ip = caller_ip
+ self.destination_ip = destination_ip
self.from_tag = from_tag
self.to_tag = None
self.mark = mark
self.from_uri = from_uri
self.to_uri = to_uri
self.caller_ua = None
self.callee_ua = None
self.cseq = None
self.previous_cseq = None
self.streams = {}
self.start_time = None
self.end_time = None
self.logger = SessionLogger(self)
self.logger.info('created: from-tag {0.from_tag})'.format(self))
self.update_media(cseq, to_tag, user_agent, media_list, is_downstream, is_caller_cseq)
def update_media(self, cseq, to_tag, user_agent, media_list, is_downstream, is_caller_cseq):
if self.cseq is None:
old_cseq = (0, 0)
else:
old_cseq = self.cseq
if is_caller_cseq:
cseq = (cseq, old_cseq[1])
if self.to_tag is None and to_tag is not None:
self.to_tag = to_tag
else:
cseq = (old_cseq[0], cseq)
if is_downstream:
party = 'caller'
if self.caller_ua is None:
self.caller_ua = user_agent
else:
party = 'callee'
if self.callee_ua is None:
self.callee_ua = user_agent
if self.cseq is None or cseq > self.cseq:
if not media_list:
return
self.logger.info('got SDP offer')
self.streams[cseq] = new_streams = []
if self.cseq is None:
old_streams = []
else:
old_streams = self.streams[self.cseq]
for media_type, media_ip, media_port, media_direction, media_parameters in media_list:
for old_stream in old_streams:
old_remote = getattr(old_stream, party).remote_sdp
if old_remote is not None:
old_ip, old_port = old_remote
else:
old_ip, old_port = None, None
if old_stream.is_alive and old_stream.media_type == media_type and ((media_ip, media_port) in ((old_ip, old_port), ('0.0.0.0', old_port), (old_ip, 0))):
stream = old_stream
stream.check_hold(party, media_direction, media_ip)
if media_port == 0:
self.logger.info('disabled stream: %s', stream)
else:
self.logger.info('retained stream: %s', stream)
break
else:
stream = MediaStream(self, media_type, media_ip, media_port, media_direction, media_parameters, party)
self.logger.info('proposed stream: %s' % stream)
if media_port == 0:
stream.cleanup()
new_streams.append(stream)
if self.previous_cseq is not None:
for stream in self.streams[self.previous_cseq]:
if stream not in self.streams[self.cseq] + new_streams:
stream.cleanup()
self.previous_cseq = self.cseq
self.cseq = cseq
elif self.cseq == cseq:
self.logger.info('got SDP answer')
now = time()
if self.start_time is None:
self.start_time = now
current_streams = self.streams[cseq]
for stream in current_streams:
if stream.start_time is None:
stream.start_time = now
if to_tag is not None and not media_list:
return
if len(media_list) < len(current_streams):
for stream in current_streams[len(media_list):]:
self.logger.info('removed! stream: %s' % stream)
stream.cleanup('rejected')
for stream, (media_type, media_ip, media_port, media_direction, media_parameters) in zip(current_streams, media_list):
if stream.media_type != media_type:
raise ValueError('Media types do not match: %r and %r' % (stream.media_type, media_type))
if media_port == 0:
if stream.is_alive:
self.logger.info('rejected stream: %s' % stream)
else:
self.logger.info('disabled stream: %s' % stream)
stream.cleanup('rejected')
continue
stream.check_hold(party, media_direction, media_ip)
party_info = getattr(stream, party)
party_info.uses_ice = (media_parameters.get('ice', 'no') == 'yes')
if party_info.remote_sdp is None or party_info.remote_sdp[0] == '0.0.0.0':
party_info.remote_sdp = (media_ip, media_port)
self.logger.info('accepted stream: %s' % stream)
else:
if party_info.remote_sdp[1] != media_port or (party_info.remote_sdp[0] != media_ip != '0.0.0.0'):
stream.reset(party, media_ip, media_port)
self.logger.info('updating stream: %s' % stream)
else:
self.logger.info('retained stream: %s' % stream)
if self.previous_cseq is not None:
for stream in [stream for stream in self.streams[self.previous_cseq] if stream not in current_streams]:
self.logger.info('removing stream: %s' % stream)
stream.cleanup()
else:
self.logger.info('got old CSeq %d:%d, ignoring' % cseq)
def get_local_media(self, is_downstream, cseq, is_caller_cseq):
if is_caller_cseq:
pos = 0
else:
pos = 1
try:
cseq = max(key for key in list(self.streams.keys()) if key[pos] == cseq)
except ValueError:
return None
if is_downstream:
retval = [(stream.status in ['active', 'on hold']) and tuple(stream.rtp.callee.local) or (stream.rtp.callee.local.host, 0) for stream in self.streams[cseq]]
else:
retval = [(stream.status in ['active', 'on hold']) and tuple(stream.rtp.caller.local) or (stream.rtp.caller.local.host, 0) for stream in self.streams[cseq]]
+ self.logger.info('SDP media ip for %s set to %s:%d' % ("callee" if is_downstream else "caller", retval[0][0], retval[0][1]))
return retval
def cleanup(self):
self.end_time = time()
for cseq in [self.previous_cseq, self.cseq]:
if cseq is not None:
for stream in self.streams[cseq]:
stream.cleanup()
def stream_expired(self, stream):
active_streams = set()
for cseq in [self.previous_cseq, self.cseq]:
if cseq is not None:
active_streams.update({stream for stream in self.streams[cseq] if stream.is_alive})
if len(active_streams) == 0:
self.manager.session_expired(self.call_id, self.from_tag)
@property
def duration(self):
if self.start_time is not None:
if self.end_time is not None:
return int(self.end_time - self.start_time)
else:
return int(time() - self.start_time)
else:
return 0
@property
def relayed_bytes(self):
return sum(stream.counters.relayed_bytes for stream in set(chain(*iter(self.streams.values()))))
@property
def statistics(self):
all_streams = set(chain(*iter(self.streams.values())))
attributes = ('call_id', 'from_tag', 'from_uri', 'to_tag', 'to_uri', 'start_time', 'duration')
stats = dict((name, getattr(self, name)) for name in attributes)
stats['caller_ua'] = self.caller_ua or 'Unknown'
stats['callee_ua'] = self.callee_ua or 'Unknown'
stats['streams'] = streams = []
stream_attributes = ('media_type', 'status', 'timeout_wait')
streams_to_sort = []
for stream in all_streams:
try:
if stream and stream.start_time:
streams_to_sort.append(stream)
except AttributeError:
pass
for stream in sorted(streams_to_sort, key=attrgetter('start_time')): # type: MediaStream
info = dict((name, getattr(stream, name)) for name in stream_attributes)
info['caller_codec'] = stream.rtp.caller.codec
info['callee_codec'] = stream.rtp.callee.codec
if stream.start_time is None:
info['start_time'] = info['end_time'] = None
elif self.start_time is None:
info['start_time'] = info['end_time'] = 0
else:
info['start_time'] = max(int(stream.start_time - self.start_time), 0)
if stream.status == 'rejected':
info['end_time'] = info['start_time']
else:
if stream.end_time is None:
info['end_time'] = stats['duration']
else:
info['end_time'] = min(int(stream.end_time - self.start_time), self.duration)
if stream.first_media_time is None:
info['post_dial_delay'] = None
else:
info['post_dial_delay'] = stream.first_media_time - stream.create_time
caller = stream.rtp.caller
callee = stream.rtp.callee
info.update(stream.counters)
info['caller_local'] = str(caller.local)
info['callee_local'] = str(callee.local)
info['caller_remote'] = str(caller.remote)
info['callee_remote'] = str(callee.remote)
streams.append(info)
return stats
class SessionManager(Logger):
@implementer(IReadDescriptor)
def __init__(self, relay, start_port, end_port):
self.relay = relay
self.ports = deque((i, i + 1) for i in range(start_port, end_port, 2))
self.bad_ports = deque()
self.sessions = {}
self.watcher = _conntrack.ExpireWatcher()
self.active_byte_counter = 0 # relayed byte counter for sessions active during last speed measurement
self.closed_byte_counter = 0 # relayed byte counter for sessions closed after last speed measurement
self.bps_relayed = 0
if RelayConfig.traffic_sampling_period > 0:
self.speed_calculator = RecurrentCall(RelayConfig.traffic_sampling_period, self._measure_speed)
else:
self.speed_calculator = None
reactor.addReader(self)
def _measure_speed(self):
start_time = time()
current_byte_counter = sum(session.relayed_bytes for session in self.sessions.values())
self.bps_relayed = 8 * (current_byte_counter + self.closed_byte_counter - self.active_byte_counter) / RelayConfig.traffic_sampling_period
self.active_byte_counter = current_byte_counter
self.closed_byte_counter = 0
us_taken = int((time() - start_time) * 1000000)
if us_taken > 10000:
log.warning('Aggregate speed calculation time exceeded 10ms: %d us for %d sessions' % (us_taken, len(self.sessions)))
return KeepRunning
# implemented for IReadDescriptor
def fileno(self):
return self.watcher.fd
def doRead(self):
stream = self.watcher.read()
if stream:
stream.expired_func()
def connectionLost(self, reason):
reactor.removeReader(self)
# port management
def get_ports(self):
if len(self.bad_ports) > len(self.ports):
log.debug('Excessive amount of bad ports, doing cleanup')
self.ports.extend(self.bad_ports)
self.bad_ports = deque()
try:
return self.ports.popleft()
except IndexError:
raise RelayPortsExhaustedError()
def set_bad_ports(self, ports):
self.bad_ports.append(ports)
def free_ports(self, ports):
self.ports.append(ports)
# called by higher level
def _find_session_key(self, call_id, from_tag, to_tag):
key_from = (call_id, from_tag)
if key_from in self.sessions:
return key_from
if to_tag:
key_to = (call_id, to_tag)
if key_to in self.sessions:
return key_to
return None
def has_session(self, call_id, from_tag, to_tag=None, **kw):
return any((call_id, tag) in self.sessions for tag in (from_tag, to_tag) if tag is not None)
def update_session(self, dispatcher, call_id, from_tag, from_uri, to_uri, cseq, user_agent, type, media=[], to_tag=None, **kw):
key = self._find_session_key(call_id, from_tag, to_tag)
+ try:
+ (signaling_ip, destination_ip) = kw['signaling_ip'].split("_")
+ except ValueError:
+ signaling_ip = kw['signaling_ip']
+ destination_ip = None
+
if key:
session = self.sessions[key]
is_downstream = (session.from_tag != from_tag) ^ (type == 'request')
is_caller_cseq = (session.from_tag == from_tag)
session.update_media(cseq, to_tag, user_agent, media, is_downstream, is_caller_cseq)
elif type == 'reply' and not media:
return None
else:
is_downstream = type == 'request'
is_caller_cseq = True
- session = Session(self, dispatcher, call_id, from_tag, from_uri, to_tag, to_uri, cseq, user_agent, media, is_downstream, is_caller_cseq)
+ session = Session(self, dispatcher, call_id, from_tag, from_uri, to_tag, to_uri, cseq, user_agent, media, is_downstream, is_caller_cseq, caller_ip=signaling_ip, destination_ip=destination_ip)
self.sessions[(call_id, from_tag)] = session
self.relay.add_session(dispatcher)
return session.get_local_media(is_downstream, cseq, is_caller_cseq)
def remove_session(self, call_id, from_tag, to_tag=None, **kw):
key = self._find_session_key(call_id, from_tag, to_tag)
try:
session = self.sessions[key]
except KeyError:
log.warning('The dispatcher tried to remove a session which is no longer present on the relay')
return None
session.logger.info('removed')
session.cleanup()
self.closed_byte_counter += session.relayed_bytes
del self.sessions[key]
reactor.callLater(0, self.relay.remove_session, session.dispatcher)
return session
def session_expired(self, call_id, from_tag):
key = (call_id, from_tag)
try:
session = self.sessions[key]
except KeyError:
log.warning('A session expired but is no longer present on the relay')
return
session.logger.info('expired')
session.cleanup()
self.closed_byte_counter += session.relayed_bytes
del self.sessions[key]
self.relay.session_expired(session)
self.relay.remove_session(session.dispatcher)
def cleanup(self):
if self.speed_calculator is not None:
self.speed_calculator.cancel()
for key in list(self.sessions.keys()):
self.session_expired(*key)
@property
def statistics(self):
return [session.statistics for session in self.sessions.values()]
@property
def stream_count(self):
stream_count = {}
for session in self.sessions.values():
for stream in set(chain(*iter(session.streams.values()))):
if stream.is_alive:
stream_count[stream.media_type] = stream_count.get(stream.media_type, 0) + 1
return stream_count

File Metadata

Mime Type
text/x-diff
Expires
Tue, Nov 26, 4:26 AM (1 d, 14 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3414123
Default Alt Text
(64 KB)

Event Timeline