Page MenuHomePhabricator

No OneTemporary

diff --git a/eventlib/api.py b/eventlib/api.py
index 9025b1a..4d4e735 100644
--- a/eventlib/api.py
+++ b/eventlib/api.py
@@ -1,592 +1,594 @@
# @author Bob Ippolito
#
# Copyright (c) 2005-2006, Bob Ippolito
# Copyright (c) 2007, Linden Research, Inc.
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import sys
import socket
import string
import linecache
import inspect
import threading
from eventlib.support import greenlets as greenlet
__all__ = [
'call_after', 'exc_after', 'getcurrent', 'get_default_hub', 'get_hub',
'GreenletExit', 'kill', 'sleep', 'spawn', 'spew', 'switch',
'ssl_listener', 'tcp_listener', 'tcp_server', 'trampoline',
'unspew', 'use_hub', 'with_timeout', 'timeout']
def switch(coro, result=None, exc=None):
if exc is not None:
return coro.throw(exc)
return coro.switch(result)
Greenlet = greenlet.greenlet
class TimeoutError(Exception):
"""Exception raised if an asynchronous operation times out"""
pass
_threadlocal = threading.local()
def tcp_listener(address, backlog=50):
"""
Listen on the given (ip, port) *address* with a TCP socket.
Returns a socket object on which one should call ``accept()`` to
accept a connection on the newly bound socket.
Generally, the returned socket will be passed to ``tcp_server()``,
which accepts connections forever and spawns greenlets for
each incoming connection.
"""
from eventlib import greenio, util
socket = greenio.GreenSocket(util.tcp_socket())
util.socket_bind_and_listen(socket, address, backlog=backlog)
return socket
def ssl_listener(address, certificate, private_key):
"""Listen on the given (ip, port) *address* with a TCP socket that
can do SSL.
*certificate* and *private_key* should be the filenames of the appropriate
certificate and private key files to use with the SSL socket.
Returns a socket object on which one should call ``accept()`` to
accept a connection on the newly bound socket.
Generally, the returned socket will be passed to ``tcp_server()``,
which accepts connections forever and spawns greenlets for
each incoming connection.
"""
- from eventlib import util
- socket = util.wrap_ssl(util.tcp_socket(), certificate, private_key)
+ from eventlib import greenio, util
+ from eventlib.green import ssl
+ socket = greenio.GreenSocket(util.tcp_socket())
+ socket = ssl.wrap_socket(socket, keyfile=private_key, certfile=certificate, server_side=True)
util.socket_bind_and_listen(socket, address)
socket.is_secure = True
return socket
def connect_tcp(address, localaddr=None):
"""
Create a TCP connection to address (host, port) and return the socket.
Optionally, bind to localaddr (host, port) first.
"""
from eventlib import greenio, util
desc = greenio.GreenSocket(util.tcp_socket())
if localaddr is not None:
desc.bind(localaddr)
desc.connect(address)
return desc
def tcp_server(listensocket, server, *args, **kw):
"""
Given a socket, accept connections forever, spawning greenlets
and executing *server* for each new incoming connection.
When *listensocket* is closed, the ``tcp_server()`` greenlet will end.
listensocket
The socket from which to accept connections.
server
The callable to call when a new connection is made.
\*args
The positional arguments to pass to *server*.
\*\*kw
The keyword arguments to pass to *server*.
"""
print "tcpserver spawning %s on %s" % (server, listensocket.getsockname())
try:
try:
while True:
spawn(server, listensocket.accept(), *args, **kw)
except socket.error, e:
# Broken pipe means it was shutdown
if e[0] != 32:
raise
finally:
listensocket.close()
def trampoline(fd, read=None, write=None, timeout=None, timeout_exc=TimeoutError):
"""Suspend the current coroutine until the given socket object or file
descriptor is ready to *read*, ready to *write*, or the specified
*timeout* elapses, depending on arguments specified.
To wait for *fd* to be ready to read, pass *read* ``=True``; ready to
write, pass *write* ``=True``. To specify a timeout, pass the *timeout*
argument in seconds.
If the specified *timeout* elapses before the socket is ready to read or
write, *timeout_exc* will be raised instead of ``trampoline()``
returning normally.
"""
t = None
hub = get_hub()
current = greenlet.getcurrent()
assert hub.greenlet is not current, 'do not call blocking functions from the mainloop'
fileno = getattr(fd, 'fileno', lambda: fd)()
def _do_close(_d, error=None):
if error is None:
current.throw(socket.error(32, 'Broken pipe'))
else:
current.throw(getattr(error, 'value', error)) # XXX convert to socket.error
def cb(d):
current.switch()
# with TwistedHub, descriptor is actually an object (socket_rwdescriptor) which stores
# this callback. If this callback stores a reference to the socket instance (fd)
# then descriptor has a reference to that instance. This makes socket not collected
# after greenlet exit. Since nobody actually uses the results of this switch, I removed
# fd from here. If it will be needed than an indirect reference which is discarded right
# after the switch above should be used.
if timeout is not None:
t = hub.schedule_call(timeout, current.throw, timeout_exc)
try:
descriptor = hub.add_descriptor(fileno, read and cb, write and cb, _do_close)
try:
return hub.switch()
finally:
hub.remove_descriptor(descriptor)
finally:
if t is not None:
t.cancel()
def get_fileno(obj):
try:
f = obj.fileno
except AttributeError:
assert isinstance(obj, (int, long))
return obj
else:
return f()
def select(read_list, write_list, error_list, timeout=None):
hub = get_hub()
t = None
current = greenlet.getcurrent()
assert hub.greenlet is not current, 'do not call blocking functions from the mainloop'
ds = {}
for r in read_list:
ds[get_fileno(r)] = {'read' : r}
for w in write_list:
ds.setdefault(get_fileno(w), {})['write'] = w
for e in error_list:
ds.setdefault(get_fileno(e), {})['error'] = e
descriptors = []
def on_read(d):
original = ds[get_fileno(d)]['read']
current.switch(([original], [], []))
def on_write(d):
original = ds[get_fileno(d)]['write']
current.switch(([], [original], []))
def on_error(d, _err=None):
original = ds[get_fileno(d)]['error']
current.switch(([], [], [original]))
def on_timeout():
current.switch(([], [], []))
if timeout is not None:
t = hub.schedule_call(timeout, on_timeout)
try:
for k, v in ds.iteritems():
d = hub.add_descriptor(k,
v.get('read') is not None and on_read,
v.get('write') is not None and on_write,
v.get('error') is not None and on_error)
descriptors.append(d)
try:
return hub.switch()
finally:
for d in descriptors:
hub.remove_descriptor(d)
finally:
if t is not None:
t.cancel()
def _spawn_startup(cb, args, kw, cancel=None):
try:
greenlet.getcurrent().parent.switch()
cancel = None
finally:
if cancel is not None:
cancel()
return cb(*args, **kw)
def _spawn(g):
g.parent = greenlet.getcurrent()
g.switch()
def spawn(function, *args, **kwds):
"""Create a new coroutine, or cooperative thread of control, within which
to execute *function*.
The *function* will be called with the given *args* and keyword arguments
*kwds* and will remain in control unless it cooperatively yields by
calling a socket method or ``sleep()``.
``spawn()`` returns control to the caller immediately, and *function* will
be called in a future main loop iteration.
An uncaught exception in *function* or any child will terminate the new
coroutine with a log message.
"""
# killable
t = None
g = Greenlet(_spawn_startup)
t = get_hub().schedule_call_global(0, _spawn, g)
g.switch(function, args, kwds, t.cancel)
return g
def kill(g, *throw_args):
get_hub().schedule_call(0, g.throw, *throw_args)
if getcurrent() is not get_hub().greenlet:
sleep(0)
def call_after_global(seconds, function, *args, **kwds):
"""Schedule *function* to be called after *seconds* have elapsed.
The function will be scheduled even if the current greenlet has exited.
*seconds* may be specified as an integer, or a float if fractional seconds
are desired. The *function* will be called with the given *args* and
keyword arguments *kwds*, and will be executed within the main loop's
coroutine.
Its return value is discarded. Any uncaught exception will be logged.
"""
# cancellable
def startup():
g = Greenlet(_spawn_startup)
g.switch(function, args, kwds)
g.switch()
t = get_hub().schedule_call_global(seconds, startup)
return t
def call_after_local(seconds, function, *args, **kwds):
"""Schedule *function* to be called after *seconds* have elapsed.
The function will NOT be called if the current greenlet has exited.
*seconds* may be specified as an integer, or a float if fractional seconds
are desired. The *function* will be called with the given *args* and
keyword arguments *kwds*, and will be executed within the main loop's
coroutine.
Its return value is discarded. Any uncaught exception will be logged.
"""
# cancellable
def startup():
g = Greenlet(_spawn_startup)
g.switch(function, args, kwds)
g.switch()
t = get_hub().schedule_call_local(seconds, startup)
return t
# for compatibility with original eventlib API
call_after = call_after_local
class _SilentException:
pass
class FakeTimer:
def cancel(self):
pass
class timeout:
"""Raise an exception in the block after timeout.
with timeout(seconds[, exc]):
... code block ...
Assuming code block is yielding (i.e. gives up control to the hub),
an exception provided in `exc' argument will be raised
(TimeoutError if `exc' is omitted).
When exc is None, code block is interrupted silently.
"""
def __init__(self, seconds, *throw_args):
self.seconds = seconds
if seconds is None:
return
if not throw_args:
self.throw_args = (TimeoutError(), )
elif throw_args == (None, ):
self.throw_args = (_SilentException(), )
else:
self.throw_args = throw_args
def __enter__(self):
if self.seconds is None:
self.timer = FakeTimer()
else:
self.timer = exc_after(self.seconds, *self.throw_args)
return self.timer
def __exit__(self, typ, value, tb):
self.timer.cancel()
if typ is _SilentException and value in self.throw_args:
return True
def with_timeout(seconds, func, *args, **kwds):
"""Wrap a call to some (yielding) function with a timeout; if the called
function fails to return before the timeout, cancel it and return a flag
value.
seconds
(int or float) seconds before timeout occurs
func
the callable to execute with a timeout; must be one of the functions
that implicitly or explicitly yields
\*args, \*\*kwds
(positional, keyword) arguments to pass to *func*
timeout_value=
value to return if timeout occurs (default raise ``TimeoutError``)
**Returns**:
Value returned by *func* if *func* returns before *seconds*, else
*timeout_value* if provided, else raise ``TimeoutError``
**Raises**:
Any exception raised by *func*, and ``TimeoutError`` if *func* times out
and no ``timeout_value`` has been provided.
**Example**::
data = with_timeout(30, httpc.get, 'http://www.google.com/', timeout_value="")
Here *data* is either the result of the ``get()`` call, or the empty string if
it took too long to return. Any exception raised by the ``get()`` call is
passed through to the caller.
"""
# Recognize a specific keyword argument, while also allowing pass-through
# of any other keyword arguments accepted by func. Use pop() so we don't
# pass timeout_value through to func().
has_timeout_value = "timeout_value" in kwds
timeout_value = kwds.pop("timeout_value", None)
error = TimeoutError()
timeout = exc_after(seconds, error)
try:
try:
return func(*args, **kwds)
except TimeoutError, ex:
if ex is error and has_timeout_value:
return timeout_value
raise
finally:
timeout.cancel()
def exc_after(seconds, *throw_args):
"""Schedule an exception to be raised into the current coroutine
after *seconds* have elapsed.
This only works if the current coroutine is yielding, and is generally
used to set timeouts after which a network operation or series of
operations will be canceled.
Returns a timer object with a ``cancel()`` method which should be used to
prevent the exception if the operation completes successfully.
See also ``with_timeout()`` that encapsulates the idiom below.
Example::
def read_with_timeout():
timer = api.exc_after(30, RuntimeError())
try:
httpc.get('http://www.google.com/')
except RuntimeError:
print "Timed out!"
else:
timer.cancel()
"""
hub = get_hub()
return call_after(seconds, getcurrent().throw, *throw_args)
def get_default_hub():
"""Select the default hub implementation based on what multiplexing
libraries are installed. Tries twistedr if a twisted reactor is imported,
then poll, then select.
"""
if 'twisted.internet.reactor' in sys.modules:
from eventlib.hubs import twistedr
return twistedr
import select
if hasattr(select, 'poll'):
import eventlib.hubs.poll
return eventlib.hubs.poll
else:
import eventlib.hubs.selects
return eventlib.hubs.selects
def use_hub(mod=None):
"""Use the module *mod*, containing a class called Hub, as the
event hub. Usually not required; the default hub is usually fine.
"""
if mod is None:
mod = get_default_hub()
if hasattr(_threadlocal, 'hub'):
del _threadlocal.hub
if isinstance(mod, str):
mod = __import__('eventlib.hubs.' + mod, globals(), locals(), ['Hub'])
if hasattr(mod, 'Hub'):
_threadlocal.Hub = mod.Hub
else:
_threadlocal.Hub = mod
def get_hub():
"""Get the current event hub singleton object.
"""
try:
hub = _threadlocal.hub
except AttributeError:
try:
_threadlocal.Hub
except AttributeError:
use_hub()
hub = _threadlocal.hub = _threadlocal.Hub()
return hub
def sleep(seconds=0):
"""Yield control to another eligible coroutine until at least *seconds* have
elapsed.
*seconds* may be specified as an integer, or a float if fractional seconds
are desired. Calling sleep with *seconds* of 0 is the canonical way of
expressing a cooperative yield. For example, if one is looping over a
large list performing an expensive calculation without calling any socket
methods, it's a good idea to call ``sleep(0)`` occasionally; otherwise
nothing else will run.
"""
hub = get_hub()
assert hub.greenlet is not greenlet.getcurrent(), 'do not call blocking functions from the mainloop'
timer = hub.schedule_call(seconds, greenlet.getcurrent().switch)
try:
hub.switch()
finally:
timer.cancel()
getcurrent = greenlet.getcurrent
GreenletExit = greenlet.GreenletExit
class Spew(object):
"""
"""
def __init__(self, trace_names=None, show_values=True):
self.trace_names = trace_names
self.show_values = show_values
def __call__(self, frame, event, arg):
if event == 'line':
lineno = frame.f_lineno
if '__file__' in frame.f_globals:
filename = frame.f_globals['__file__']
if (filename.endswith('.pyc') or
filename.endswith('.pyo')):
filename = filename[:-1]
name = frame.f_globals['__name__']
line = linecache.getline(filename, lineno)
else:
name = '[unknown]'
try:
src = inspect.getsourcelines(frame)
line = src[lineno]
except IOError:
line = 'Unknown code named [%s]. VM instruction #%d' % (
frame.f_code.co_name, frame.f_lasti)
if self.trace_names is None or name in self.trace_names:
print '%s:%s: %s' % (name, lineno, line.rstrip())
if not self.show_values:
return self
details = '\t'
tokens = line.translate(
string.maketrans(' ,.()', '\0' * 5)).split('\0')
for tok in tokens:
if tok in frame.f_globals:
details += '%s=%r ' % (tok, frame.f_globals[tok])
if tok in frame.f_locals:
details += '%s=%r ' % (tok, frame.f_locals[tok])
if details.strip():
print details
return self
def spew(trace_names=None, show_values=False):
"""Install a trace hook which writes incredibly detailed logs
about what code is being executed to stdout.
"""
sys.settrace(Spew(trace_names, show_values))
def unspew():
"""Remove the trace hook installed by spew.
"""
sys.settrace(None)
def named(name):
"""Return an object given its name.
The name uses a module-like syntax, eg::
os.path.join
or::
mulib.mu.Resource
"""
toimport = name
obj = None
import_err_strings = []
while toimport:
try:
obj = __import__(toimport)
break
except ImportError, err:
# print 'Import error on %s: %s' % (toimport, err) # debugging spam
import_err_strings.append(err.__str__())
toimport = '.'.join(toimport.split('.')[:-1])
if obj is None:
raise ImportError('%s could not be imported. Import errors: %r' % (name, import_err_strings))
for seg in name.split('.')[1:]:
try:
obj = getattr(obj, seg)
except AttributeError:
dirobj = dir(obj)
dirobj.sort()
raise AttributeError('attribute %r missing from %r (%r) %r. Import errors: %r' % (
seg, obj, dirobj, name, import_err_strings))
return obj
diff --git a/eventlib/green/socket.py b/eventlib/green/socket.py
index 3197d9a..f47ddaa 100644
--- a/eventlib/green/socket.py
+++ b/eventlib/green/socket.py
@@ -1,97 +1,113 @@
__socket = __import__('socket')
for var in __socket.__all__:
exec "%s = __socket.%s" % (var, var)
_fileobject = __socket._fileobject
from eventlib.api import get_hub
-from eventlib.greenio import GreenSocket as socket, GreenSSL as _GreenSSL
+from eventlib.greenio import GreenSocket as socket
from eventlib.greenio import socketpair, fromfd
+import warnings
+
def gethostbyname(name):
if getattr(get_hub(), 'uses_twisted_reactor', None):
globals()['gethostbyname'] = _gethostbyname_twisted
else:
globals()['gethostbyname'] = _gethostbyname_tpool
return globals()['gethostbyname'](name)
def _gethostbyname_twisted(name):
from twisted.internet import reactor
from eventlib.twistedutil import block_on as _block_on
return _block_on(reactor.resolve(name))
def _gethostbyname_tpool(name):
from eventlib import tpool
return tpool.execute(
__socket.gethostbyname, name)
def getaddrinfo(*args, **kw):
if getattr(get_hub(), 'uses_twisted_reactor', None):
globals()['getaddrinfo'] = _getaddrinfo_twisted
else:
globals()['getaddrinfo'] = _getaddrinfo_tpool
return globals()['getaddrinfo'](*args, **kw)
def _getaddrinfo_twisted(*args, **kw):
from twisted.internet.threads import deferToThread
from eventlib.twistedutil import block_on as _block_on
return _block_on(deferToThread(__socket.getaddrinfo, *args, **kw))
def _getaddrinfo_tpool(*args, **kw):
from eventlib import tpool
return tpool.execute(
__socket.getaddrinfo, *args, **kw)
# XXX there're few more blocking functions in socket
# XXX having a hub-independent way to access thread pool would be nice
_GLOBAL_DEFAULT_TIMEOUT = __socket._GLOBAL_DEFAULT_TIMEOUT
def create_connection(address, timeout=_GLOBAL_DEFAULT_TIMEOUT):
"""Connect to *address* and return the socket object.
Convenience function. Connect to *address* (a 2-tuple ``(host,
port)``) and return the socket object. Passing the optional
*timeout* parameter will set the timeout on the socket instance
before attempting to connect. If no *timeout* is supplied, the
global default timeout setting returned by :func:`getdefaulttimeout`
is used.
"""
msg = "getaddrinfo returns an empty list"
host, port = address
for res in getaddrinfo(host, port, 0, SOCK_STREAM):
af, socktype, proto, canonname, sa = res
sock = None
try:
sock = socket(af, socktype, proto)
if timeout is not _GLOBAL_DEFAULT_TIMEOUT:
sock.settimeout(timeout)
sock.connect(sa)
return sock
except error, msg:
if sock is not None:
sock.close()
raise error, msg
-def ssl(sock, certificate=None, private_key=None):
- from OpenSSL import SSL
- context = SSL.Context(SSL.SSLv23_METHOD)
- if certificate is not None:
- context.use_certificate_file(certificate)
- if private_key is not None:
- context.use_privatekey_file(private_key)
- context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
-
- ## TODO only do this on client sockets? how?
- connection = SSL.Connection(context, sock)
- connection.set_connect_state()
- return _GreenSSL(connection)
-
-sslerror = __socket.sslerror
+try:
+ from eventlib.green import ssl as ssl_module
+except ImportError:
+ # no ssl support
+ pass
+else:
+ # some constants the SSL module exports but not in __all__
+ from eventlib.green.ssl import (RAND_add,
+ RAND_egd,
+ RAND_status,
+ SSL_ERROR_ZERO_RETURN,
+ SSL_ERROR_WANT_READ,
+ SSL_ERROR_WANT_WRITE,
+ SSL_ERROR_WANT_X509_LOOKUP,
+ SSL_ERROR_SYSCALL,
+ SSL_ERROR_SSL,
+ SSL_ERROR_WANT_CONNECT,
+ SSL_ERROR_EOF,
+ SSL_ERROR_INVALID_ERROR_CODE)
+ try:
+ sslerror = __socket.sslerror
+ __socket.ssl
+ def ssl(sock, certificate=None, private_key=None):
+ warnings.warn("socket.ssl() is deprecated. Use ssl.wrap_socket() instead.", DeprecationWarning, stacklevel=2)
+ return ssl_module.sslwrap_simple(sock, private_key, certificate)
+ except AttributeError:
+ # if the real socket module doesn't have the ssl method or sslerror
+ # exception, we don't emulate them
+ pass
diff --git a/eventlib/green/ssl.py b/eventlib/green/ssl.py
new file mode 100644
index 0000000..6439957
--- /dev/null
+++ b/eventlib/green/ssl.py
@@ -0,0 +1,259 @@
+__ssl = __import__('ssl')
+for var in (var for var in dir(__ssl) if not var.startswith('__')):
+ exec "%s = __ssl.%s" % (var, var)
+del var
+
+time = __import__('time')
+from eventlib.api import trampoline
+from eventlib.greenio import set_nonblocking, GreenSocket, CONNECT_ERR, CONNECT_SUCCESS, BLOCKING_ERR
+orig_socket = __import__('socket')
+socket = orig_socket.socket
+
+__patched__ = ['SSLSocket', 'wrap_socket', 'sslwrap_simple']
+
+
+class GreenSSLSocket(__ssl.SSLSocket):
+ """ This is a green version of the SSLSocket class from the ssl module added
+ in 2.6. For documentation on it, please see the Python standard
+ documentation.
+
+ Python nonblocking ssl objects don't give errors when the other end
+ of the socket is closed (they do notice when the other end is shutdown,
+ though). Any write/read operations will simply hang if the socket is
+ closed from the other end. There is no obvious fix for this problem;
+ it appears to be a limitation of Python's ssl object implementation.
+ A workaround is to set a reasonable timeout on the socket using
+ settimeout(), and to close/reopen the connection when a timeout
+ occurs at an unexpected juncture in the code.
+ """
+ # we are inheriting from SSLSocket because its constructor calls
+ # do_handshake whose behavior we wish to override
+ def __init__(self, sock, *args, **kw):
+ if not isinstance(sock, GreenSocket):
+ sock = GreenSocket(sock)
+
+ self.act_non_blocking = sock.act_non_blocking
+ self._timeout = sock.gettimeout()
+ super(GreenSSLSocket, self).__init__(sock.fd, *args, **kw)
+
+ # the superclass initializer trashes the methods so we remove
+ # the local-object versions of them and let the actual class
+ # methods shine through
+ try:
+ for fn in orig_socket._delegate_methods:
+ delattr(self, fn)
+ except AttributeError:
+ pass
+
+ def settimeout(self, timeout):
+ self._timeout = timeout
+
+ def gettimeout(self):
+ return self._timeout
+
+ def setblocking(self, flag):
+ if flag:
+ self.act_non_blocking = False
+ self._timeout = None
+ else:
+ self.act_non_blocking = True
+ self._timeout = 0.0
+
+ def _call_trampolining(self, func, *a, **kw):
+ if self.act_non_blocking:
+ return func(*a, **kw)
+ else:
+ while True:
+ try:
+ return func(*a, **kw)
+ except SSLError, e:
+ if e.args[0] == SSL_ERROR_WANT_READ:
+ trampoline(self,
+ read=True,
+ timeout=self.gettimeout(),
+ timeout_exc=SSLError('timed out'))
+ elif e.args[0] == SSL_ERROR_WANT_WRITE:
+ trampoline(self,
+ write=True,
+ timeout=self.gettimeout(),
+ timeout_exc=SSLError('timed out'))
+ else:
+ raise
+
+ def write(self, data):
+ """Write DATA to the underlying SSL channel. Returns
+ number of bytes of DATA actually transmitted."""
+ return self._call_trampolining(
+ super(GreenSSLSocket, self).write, data)
+
+ def read(self, len=1024):
+ """Read up to LEN bytes and return them.
+ Return zero-length string on EOF."""
+ return self._call_trampolining(
+ super(GreenSSLSocket, self).read, len)
+
+ def send (self, data, flags=0):
+ if self._sslobj:
+ return self._call_trampolining(
+ super(GreenSSLSocket, self).send, data, flags)
+ else:
+ trampoline(self, write=True, timeout_exc=SSLError('timed out'))
+ return socket.send(self, data, flags)
+
+ def sendto (self, data, addr, flags=0):
+ # *NOTE: gross, copied code from ssl.py becase it's not factored well enough to be used as-is
+ if self._sslobj:
+ raise ValueError("sendto not allowed on instances of %s" %
+ self.__class__)
+ else:
+ trampoline(self, write=True, timeout_exc=SSLError('timed out'))
+ return socket.sendto(self, data, addr, flags)
+
+ def sendall (self, data, flags=0):
+ # *NOTE: gross, copied code from ssl.py becase it's not factored well enough to be used as-is
+ if self._sslobj:
+ if flags != 0:
+ raise ValueError(
+ "non-zero flags not allowed in calls to sendall() on %s" %
+ self.__class__)
+ amount = len(data)
+ count = 0
+ while (count < amount):
+ v = self.send(data[count:])
+ count += v
+ return amount
+ else:
+ trampoline(self, write=True, timeout_exc=SSLError('timed out'))
+ return socket.sendall(self, data, flags)
+
+ def recv(self, buflen=1024, flags=0):
+ # *NOTE: gross, copied code from ssl.py becase it's not factored well enough to be used as-is
+ if self._sslobj:
+ if flags != 0:
+ raise ValueError(
+ "non-zero flags not allowed in calls to recv() on %s" %
+ self.__class__)
+ read = self.read(buflen)
+ return read
+ else:
+ return socket.recv(self, buflen, flags)
+
+ def recv_into (self, buffer, nbytes=None, flags=0):
+ if not self.act_non_blocking:
+ trampoline(self, read=True, timeout=self.gettimeout(), timeout_exc=SSLError('timed out'))
+ return super(GreenSSLSocket, self).recv_into(buffer, nbytes, flags)
+
+ def recvfrom (self, addr, buflen=1024, flags=0):
+ if not self.act_non_blocking:
+ trampoline(self, read=True, timeout=self.gettimeout(), timeout_exc=SSLError('timed out'))
+ return super(GreenSSLSocket, self).recvfrom(addr, buflen, flags)
+
+ def recvfrom_into (self, buffer, nbytes=None, flags=0):
+ if not self.act_non_blocking:
+ trampoline(self, read=True, timeout=self.gettimeout(), timeout_exc=SSLError('timed out'))
+ return super(GreenSSLSocket, self).recvfrom_into(buffer, nbytes, flags)
+
+ def unwrap(self):
+ return GreenSocket(super(GreenSSLSocket, self).unwrap())
+
+ def do_handshake(self):
+ """Perform a TLS/SSL handshake."""
+ return self._call_trampolining(
+ super(GreenSSLSocket, self).do_handshake)
+
+ def _socket_connect(self, addr):
+ real_connect = socket.connect
+ if self.act_non_blocking:
+ return real_connect(self, addr)
+ else:
+ # *NOTE: gross, copied code from greenio because it's not factored
+ # well enough to reuse
+ if self.gettimeout() is None:
+ while True:
+ try:
+ return real_connect(self, addr)
+ except orig_socket.error, e:
+ if e.args[0] in CONNECT_ERR:
+ trampoline(self, write=True)
+ elif e.args[0] in CONNECT_SUCCESS:
+ return
+ else:
+ raise
+ else:
+ end = time.time() + self.gettimeout()
+ while True:
+ try:
+ real_connect(self, addr)
+ except orig_socket.error, e:
+ if e.args[0] in CONNECT_ERR:
+ trampoline(self, write=True, timeout=end-time.time(), timeout_exc=SSLError('timed out'))
+ elif e.args[0] in CONNECT_SUCCESS:
+ return
+ else:
+ raise
+ if time.time() >= end:
+ raise SSLError('timed out')
+
+ def connect(self, addr):
+ """Connects to remote ADDR, and then wraps the connection in
+ an SSL channel."""
+ # *NOTE: grrrrr copied this code from ssl.py because of the reference
+ # to socket.connect which we don't want to call directly
+ if self._sslobj:
+ raise ValueError("attempt to connect already-connected SSLSocket!")
+ self._socket_connect(addr)
+ self._sslobj = _ssl.sslwrap(self._sock, False, self.keyfile, self.certfile,
+ self.cert_reqs, self.ssl_version,
+ self.ca_certs, self.ciphers)
+ if self.do_handshake_on_connect:
+ self.do_handshake()
+
+ def accept(self):
+ """Accepts a new connection from a remote client, and returns
+ a tuple containing that new connection wrapped with a server-side
+ SSL channel, and the address of the remote client."""
+ # RDW grr duplication of code from greenio
+ if self.act_non_blocking:
+ newsock, addr = socket.accept(self)
+ else:
+ while True:
+ try:
+ newsock, addr = socket.accept(self)
+ set_nonblocking(newsock)
+ break
+ except orig_socket.error, e:
+ if e.args[0] not in BLOCKING_ERR:
+ raise
+ trampoline(self, read=True, timeout=self.gettimeout(), timeout_exc=SSLError('timed out'))
+
+ new_ssl = type(self)(newsock,
+ keyfile=self.keyfile,
+ certfile=self.certfile,
+ server_side=True,
+ cert_reqs=self.cert_reqs,
+ ssl_version=self.ssl_version,
+ ca_certs=self.ca_certs,
+ do_handshake_on_connect=self.do_handshake_on_connect,
+ suppress_ragged_eofs=self.suppress_ragged_eofs)
+ return (new_ssl, addr)
+
+ def dup(self):
+ raise NotImplementedError("Can't dup an ssl object")
+
+SSLSocket = GreenSSLSocket
+
+def wrap_socket(sock, *a, **kw):
+ return GreenSSLSocket(sock, *a, **kw)
+
+
+if hasattr(__ssl, 'sslwrap_simple'):
+ def sslwrap_simple(sock, keyfile=None, certfile=None):
+ """A replacement for the old socket.ssl function. Designed
+ for compability with Python 2.5 and earlier. Will disappear in
+ Python 3.0."""
+ ssl_sock = GreenSSLSocket(sock, keyfile=keyfile, certfile=certfile,
+ server_side=False,
+ cert_reqs=CERT_NONE,
+ ssl_version=PROTOCOL_SSLv23,
+ ca_certs=None)
+ return ssl_sock
diff --git a/eventlib/greenio.py b/eventlib/greenio.py
index c155335..8a7f2c2 100644
--- a/eventlib/greenio.py
+++ b/eventlib/greenio.py
@@ -1,628 +1,534 @@
# Copyright (c) 2005-2006, Bob Ippolito
# Copyright (c) 2007, Linden Research, Inc.
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from eventlib.api import trampoline, get_hub
-from eventlib import util
BUFFER_SIZE = 4096
import errno
import os
import sys
import socket
from socket import socket as _original_socket
import time
-from errno import EWOULDBLOCK, EAGAIN
-
-
__all__ = ['GreenSocket', 'GreenFile', 'GreenPipe']
def higher_order_recv(recv_func):
def recv(self, buflen):
if self.act_non_blocking:
return self.fd.recv(buflen)
buf = self.recvbuffer
if buf:
chunk, self.recvbuffer = buf[:buflen], buf[buflen:]
return chunk
fd = self.fd
bytes = recv_func(fd, buflen)
if self.gettimeout():
end = time.time()+self.gettimeout()
else:
end = None
timeout = None
while bytes is None:
try:
if end:
timeout = end - time.time()
trampoline(fd, read=True, timeout=timeout, timeout_exc=socket.timeout)
except socket.timeout:
raise
except socket.error, e:
if e[0] == errno.EPIPE:
bytes = ''
else:
raise
else:
bytes = recv_func(fd, buflen)
self.recvcount += len(bytes)
return bytes
return recv
def higher_order_send(send_func):
def send(self, data):
if self.act_non_blocking:
return self.fd.send(data)
count = send_func(self.fd, data)
if not count:
return 0
self.sendcount += count
return count
return send
+
if sys.platform == 'win32':
CONNECT_ERR = (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK, errno.WSAEINVAL, errno.WSAEWOULDBLOCK)
+ CONNECT_SUCCESS = (0, errno.EISCONN, errno.WSAEISCONN)
else:
CONNECT_ERR = (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK)
-CONNECT_SUCCESS = (0, errno.EISCONN)
+ CONNECT_SUCCESS = (0, errno.EISCONN)
+
def socket_connect(descriptor, address):
err = descriptor.connect_ex(address)
if err in CONNECT_ERR:
return None
if err not in CONNECT_SUCCESS:
raise socket.error(err, errno.errorcode[err])
return descriptor
if sys.platform == 'win32':
BLOCKING_ERR = (errno.EWOULDBLOCK, errno.WSAEWOULDBLOCK)
else:
BLOCKING_ERR = (errno.EWOULDBLOCK, )
def socket_accept(descriptor):
try:
return descriptor.accept()
except socket.error, e:
- if e[0] in BLOCKING_ERR:
+ if e.args[0] in BLOCKING_ERR:
return None
raise
def socket_send(descriptor, data):
try:
return descriptor.send(data)
except socket.error, e:
- if e[0] in BLOCKING_ERR + errno.ENOTCONN:
+ if e.args[0] in BLOCKING_ERR + errno.ENOTCONN:
return 0
raise
- except util.SSL.WantWriteError:
- return 0
- except util.SSL.WantReadError:
- trampoline(descriptor.fileno(), read=True)
- return 0
# winsock sometimes throws ENOTCONN
SOCKET_CLOSED = (errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN)
def socket_recv(descriptor, buflen):
try:
return descriptor.recv(buflen)
except socket.error, e:
- if e[0] in BLOCKING_ERR:
+ if e.args[0] in BLOCKING_ERR:
return None
- if e[0] in SOCKET_CLOSED:
- return ''
- raise
- except util.SSL.WantReadError:
- return None
- except util.SSL.ZeroReturnError:
- return ''
- except util.SSL.SysCallError, e:
- if e[0] == -1 or e[0] > 0:
+ if e.args[0] in SOCKET_CLOSED:
return ''
raise
def file_recv(fd, buflen):
try:
return fd.read(buflen)
except IOError, e:
- if e[0] == EAGAIN:
+ if e[0] == errno.EAGAIN:
return None
return ''
except socket.error, e:
if e[0] == errno.EPIPE:
return ''
raise
def file_send(fd, data):
try:
fd.write(data)
fd.flush()
return len(data)
except IOError, e:
- if e[0] == EAGAIN:
+ if e[0] == errno.EAGAIN:
return 0
except ValueError, e:
written = 0
except socket.error, e:
if e[0] == errno.EPIPE:
written = 0
def set_nonblocking(fd):
try:
setblocking = fd.setblocking
except AttributeError:
# This version of Python predates socket.setblocking()
import fcntl
fileno = fd.fileno()
flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
fcntl.fcntl(fileno, fcntl.F_SETFL, flags | os.O_NONBLOCK)
else:
# socket supports setblocking()
setblocking(0)
class GreenSocket(object):
is_secure = False
timeout = None
def __init__(self, family_or_realsock=socket.AF_INET, *args, **kwargs):
if isinstance(family_or_realsock, (int, long)):
fd = _original_socket(family_or_realsock, *args, **kwargs)
else:
fd = family_or_realsock
assert not args, args
assert not kwargs, kwargs
set_nonblocking(fd)
self.fd = fd
self._fileno = fd.fileno()
self.sendcount = 0
self.recvcount = 0
self.recvbuffer = ''
self.closed = False
self.timeout = socket.getdefaulttimeout()
# when client calls setblocking(0) or settimeout(0) the socket must
# act non-blocking
self.act_non_blocking = False
@property
def family(self):
return self.fd.family
@property
def type(self):
return self.fd.type
@property
def proto(self):
return self.fd.proto
def accept(self):
if self.act_non_blocking:
return self.fd.accept()
fd = self.fd
while True:
res = socket_accept(fd)
if res is not None:
client, addr = res
set_nonblocking(client)
return type(self)(client), addr
trampoline(fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout)
def bind(self, *args, **kw):
fn = self.bind = self.fd.bind
return fn(*args, **kw)
def close(self, *args, **kw):
if self.closed:
return
self.closed = True
if self.is_secure:
# *NOTE: This is not quite the correct SSL shutdown sequence.
# We should actually be checking the return value of shutdown.
# Note also that this is not the same as calling self.shutdown().
self.fd.shutdown()
fn = self.close = self.fd.close
try:
res = fn(*args, **kw)
finally:
# This will raise socket.error(32, 'Broken pipe') if there's
# a caller waiting on trampoline (e.g. server on .accept())
get_hub().exc_descriptor(self._fileno)
return res
def connect(self, address):
if self.act_non_blocking:
return self.fd.connect(address)
fd = self.fd
if self.gettimeout() is None:
while not socket_connect(fd, address):
trampoline(fd, write=True, timeout_exc=socket.timeout)
else:
end = time.time() + self.gettimeout()
while True:
if socket_connect(fd, address):
return
if time.time() >= end:
raise socket.timeout
trampoline(fd, write=True, timeout=end-time.time(), timeout_exc=socket.timeout)
def connect_ex(self, address):
if self.act_non_blocking:
return self.fd.connect_ex(address)
fd = self.fd
if self.gettimeout() is None:
while not socket_connect(fd, address):
try:
trampoline(fd, write=True, timeout_exc=socket.timeout)
except socket.error, ex:
return ex[0]
else:
end = time.time() + self.gettimeout()
while True:
if socket_connect(fd, address):
return 0
if time.time() >= end:
raise socket.timeout
try:
trampoline(fd, write=True, timeout=end-time.time(), timeout_exc=socket.timeout)
except socket.error, ex:
return ex[0]
def dup(self, *args, **kw):
sock = self.fd.dup(*args, **kw)
set_nonblocking(sock)
newsock = type(self)(sock)
newsock.settimeout(self.timeout)
return newsock
def fileno(self, *args, **kw):
fn = self.fileno = self.fd.fileno
return fn(*args, **kw)
def getpeername(self, *args, **kw):
fn = self.getpeername = self.fd.getpeername
return fn(*args, **kw)
def getsockname(self, *args, **kw):
fn = self.getsockname = self.fd.getsockname
return fn(*args, **kw)
def getsockopt(self, *args, **kw):
fn = self.getsockopt = self.fd.getsockopt
return fn(*args, **kw)
def listen(self, *args, **kw):
fn = self.listen = self.fd.listen
return fn(*args, **kw)
def makefile(self, mode='r', bufsize=-1):
return socket._fileobject(self.dup(), mode, bufsize)
def makeGreenFile(self, mode='r', bufsize=-1):
return GreenFile(self.dup())
recv = higher_order_recv(socket_recv)
def recvfrom(self, *args):
if not self.act_non_blocking:
trampoline(self.fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout)
return self.fd.recvfrom(*args)
def recvfrom_into(self, *args):
if not self.act_non_blocking:
trampoline(self.fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout)
return self.fd.recvfrom_into(*args)
def recv_into(self, *args):
if not self.act_non_blocking:
trampoline(self.fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout)
return self.fd.recv_into(*args)
send = higher_order_send(socket_send)
def sendall(self, data):
fd = self.fd
tail = self.send(data)
while tail < len(data):
trampoline(self.fd, write=True, timeout_exc=socket.timeout)
tail += self.send(data[tail:])
def sendto(self, *args):
trampoline(self.fd, write=True, timeout_exc=socket.timeout)
return self.fd.sendto(*args)
def setblocking(self, flag):
if flag:
self.act_non_blocking = False
self.timeout = None
else:
self.act_non_blocking = True
self.timeout = 0.0
def setsockopt(self, *args, **kw):
fn = self.setsockopt = self.fd.setsockopt
return fn(*args, **kw)
def shutdown(self, *args, **kw):
if self.is_secure:
fn = self.shutdown = self.fd.sock_shutdown
else:
fn = self.shutdown = self.fd.shutdown
return fn(*args, **kw)
def settimeout(self, howlong):
if howlong is None:
self.setblocking(True)
return
try:
f = howlong.__float__
except AttributeError:
raise TypeError('a float is required')
howlong = f()
if howlong < 0.0:
raise ValueError('Timeout value out of range')
if howlong == 0.0:
self.setblocking(howlong)
else:
self.timeout = howlong
def gettimeout(self):
return self.timeout
def read(self, size=None):
if size is not None and not isinstance(size, (int, long)):
raise TypeError('Expecting an int or long for size, got %s: %s' % (type(size), repr(size)))
buf, self.sock.recvbuffer = self.sock.recvbuffer, ''
lst = [buf]
if size is None:
while True:
d = self.sock.recv(BUFFER_SIZE)
if not d:
break
lst.append(d)
else:
buflen = len(buf)
while buflen < size:
d = self.sock.recv(BUFFER_SIZE)
if not d:
break
buflen += len(d)
lst.append(d)
else:
d = lst[-1]
overbite = buflen - size
if overbite:
lst[-1], self.sock.recvbuffer = d[:-overbite], d[-overbite:]
else:
lst[-1], self.sock.recvbuffer = d, ''
return ''.join(lst)
class GreenFile(object):
newlines = '\r\n'
mode = 'wb+'
def __init__(self, fd):
if isinstance(fd, GreenSocket):
set_nonblocking(fd.fd)
else:
set_nonblocking(fd)
self.sock = fd
self.closed = False
def close(self):
self.sock.close()
self.closed = True
def fileno(self):
return self.sock.fileno()
# TODO next
def flush(self):
pass
def write(self, data):
return self.sock.sendall(data)
def readuntil(self, terminator, size=None):
buf, self.sock.recvbuffer = self.sock.recvbuffer, ''
checked = 0
if size is None:
while True:
found = buf.find(terminator, checked)
if found != -1:
found += len(terminator)
chunk, self.sock.recvbuffer = buf[:found], buf[found:]
return chunk
checked = max(0, len(buf) - (len(terminator) - 1))
d = self.sock.recv(BUFFER_SIZE)
if not d:
break
buf += d
return buf
while len(buf) < size:
found = buf.find(terminator, checked)
if found != -1:
found += len(terminator)
chunk, self.sock.recvbuffer = buf[:found], buf[found:]
return chunk
checked = len(buf)
d = self.sock.recv(BUFFER_SIZE)
if not d:
break
buf += d
chunk, self.sock.recvbuffer = buf[:size], buf[size:]
return chunk
def readline(self, size=None):
return self.readuntil(self.newlines, size=size)
def __iter__(self):
return self.xreadlines()
def readlines(self, size=None):
return list(self.xreadlines(size=size))
def xreadlines(self, size=None):
if size is None:
while True:
line = self.readline()
if not line:
break
yield line
else:
while size > 0:
line = self.readline(size)
if not line:
break
yield line
size -= len(line)
def writelines(self, lines):
for line in lines:
self.write(line)
read = read
class GreenPipeSocket(GreenSocket):
""" This is a weird class that looks like a socket but expects a file descriptor as an argument instead of a socket.
"""
recv = higher_order_recv(file_recv)
send = higher_order_send(file_send)
class GreenPipe(GreenFile):
def __init__(self, fd):
set_nonblocking(fd)
self.fd = GreenPipeSocket(fd)
super(GreenPipe, self).__init__(self.fd)
def recv(self, *args, **kw):
fn = self.recv = self.fd.recv
return fn(*args, **kw)
def send(self, *args, **kw):
fn = self.send = self.fd.send
return fn(*args, **kw)
def flush(self):
self.fd.fd.flush()
-
-class RefCount(object):
- """ Reference counting class only to be used with GreenSSL objects """
- def __init__(self):
- self._count = 1
-
- def increment(self):
- self._count += 1
-
- def decrement(self):
- self._count -= 1
- assert self._count >= 0
-
- def is_referenced(self):
- return self._count > 0
-
-
-class GreenSSL(GreenSocket):
- def __init__(self, fd, refcount = None):
- GreenSocket.__init__(self, fd)
- self.sock = self
- self._refcount = refcount
- if refcount is None:
- self._refcount = RefCount()
-
- def read(self, buflen=1024):
- try:
- return self.sock.recv(buflen)
- except socket.error, e:
- if e[0] in BLOCKING_ERR:
- return None
- if e[0] in SOCKET_CLOSED:
- return ''
- raise
- except util.SSL.WantReadError:
- return None
- except util.SSL.ZeroReturnError:
- return ''
- except util.SSL.SysCallError, e:
- if e[0] == -1 or e[0] > 0:
- return ''
- raise
-
- def sendall(self, data):
- # overriding sendall because ssl sockets behave badly when asked to
- # send empty strings; 'normal' sockets don't have a problem
- if not data:
- return
- super(GreenSSL, self).sendall(data)
-
- def write(self, data):
- try:
- return self.sendall(data)
- except util.SSL.Error, ex:
- raise socket.sslerror(str(ex))
-
- def server(self):
- return self.fd.server()
-
- def issuer(self):
- return self.fd.issuer()
-
- def dup(self):
- raise NotImplementedError("Dup not supported on SSL sockets")
-
- def makefile(self, *args, **kw):
- self._refcount.increment()
- return GreenFile(type(self)(self.fd, refcount = self._refcount))
-
- makeGreenFile = makefile
-
- def close(self):
- self._refcount.decrement()
- if self._refcount.is_referenced():
- return
- super(GreenSSL, self).close()
-
-
-
-
def socketpair(*args):
one, two = socket.socketpair(*args)
return GreenSocket(one), GreenSocket(two)
def fromfd(*args):
return GreenSocket(socket.fromfd(*args))
diff --git a/eventlib/util.py b/eventlib/util.py
index 1b2bdb2..da3408e 100644
--- a/eventlib/util.py
+++ b/eventlib/util.py
@@ -1,247 +1,175 @@
# @author Bob Ippolito
#
# Copyright (c) 2005-2006, Bob Ippolito
# Copyright (c) 2007, Linden Research, Inc.
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import os
import select
import socket
import errno
import sys
-try:
- from OpenSSL import SSL
-except ImportError:
- class SSL(object):
- class WantWriteError(object):
- pass
-
- class WantReadError(object):
- pass
-
- class ZeroReturnError(object):
- pass
-
- class SysCallError(object):
- pass
-
def g_log(*args):
import sys
from eventlib.support import greenlets as greenlet
g_id = id(greenlet.getcurrent())
if g_id is None:
if greenlet.getcurrent().parent is None:
ident = 'greenlet-main'
else:
g_id = id(greenlet.getcurrent())
if g_id < 0:
g_id += 1 + ((sys.maxint + 1) << 1)
ident = '%08X' % (g_id,)
else:
ident = 'greenlet-%d' % (g_id,)
print >>sys.stderr, '[%s] %s' % (ident, ' '.join(map(str, args)))
__original_socket__ = socket.socket
__original_gethostbyname__ = socket.gethostbyname
__original_getaddrinfo__ = socket.getaddrinfo
if sys.platform != 'win32':
__original_fromfd__ = socket.fromfd
def tcp_socket():
s = __original_socket__(socket.AF_INET, socket.SOCK_STREAM)
return s
-try:
- __original_ssl__ = socket.ssl
-except AttributeError:
- __original_ssl__ = None
-
-
-def wrap_ssl(sock, certificate=None, private_key=None):
- from OpenSSL import SSL
- from eventlib import greenio
- context = SSL.Context(SSL.SSLv23_METHOD)
- if certificate is not None:
- context.use_certificate_file(certificate)
- if private_key is not None:
- context.use_privatekey_file(private_key)
- context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
-
- ## TODO only do this on client sockets? how?
- connection = SSL.Connection(context, sock)
- connection.set_connect_state()
- return greenio.GreenSSL(connection)
-
-
-socket_already_wrapped = False
-def wrap_socket_with_coroutine_socket(use_thread_pool=True):
- global socket_already_wrapped
- if socket_already_wrapped:
- return
-
- def new_socket(*args, **kw):
- from eventlib import greenio
- return greenio.GreenSocket(__original_socket__(*args, **kw))
- socket.socket = new_socket
-
- socket.ssl = wrap_ssl
-
- if use_thread_pool:
- from eventlib import tpool
- def new_gethostbyname(*args, **kw):
- return tpool.execute(
- __original_gethostbyname__, *args, **kw)
- socket.gethostbyname = new_gethostbyname
-
- def new_getaddrinfo(*args, **kw):
- return tpool.execute(
- __original_getaddrinfo__, *args, **kw)
- socket.getaddrinfo = new_getaddrinfo
-
- if sys.platform != 'win32':
- def new_fromfd(*args, **kw):
- from eventlib import greenio
- return greenio.GreenSocket(__original_fromfd__(*args, **kw))
- socket.fromfd = new_fromfd
-
- socket_already_wrapped = True
-
-
__original_fdopen__ = os.fdopen
__original_read__ = os.read
__original_write__ = os.write
__original_waitpid__ = os.waitpid
if sys.platform != 'win32':
__original_fork__ = os.fork
## TODO wrappings for popen functions? not really needed since Process object exists?
pipes_already_wrapped = False
def wrap_pipes_with_coroutine_pipes():
from eventlib import processes ## Make sure the signal handler is installed
global pipes_already_wrapped
if pipes_already_wrapped:
return
def new_fdopen(*args, **kw):
from eventlib import greenio
return greenio.GreenPipe(__original_fdopen__(*args, **kw))
def new_read(fd, *args, **kw):
from eventlib import api
try:
api.trampoline(fd, read=True)
except socket.error, e:
if e[0] == errno.EPIPE:
return ''
else:
raise
return __original_read__(fd, *args, **kw)
def new_write(fd, *args, **kw):
from eventlib import api
api.trampoline(fd, write=True)
return __original_write__(fd, *args, **kw)
if sys.platform != 'win32':
def new_fork(*args, **kwargs):
pid = __original_fork__()
if pid:
processes._add_child_pid(pid)
return pid
os.fork = new_fork
def new_waitpid(pid, options):
from eventlib import processes
evt = processes.CHILD_EVENTS.get(pid)
if not evt:
return 0, 0
if options == os.WNOHANG:
if evt.ready():
return pid, evt.wait()
return 0, 0
elif options:
return __original_waitpid__(pid, options)
return pid, evt.wait()
os.fdopen = new_fdopen
os.read = new_read
os.write = new_write
os.waitpid = new_waitpid
__original_select__ = select.select
try:
import threading
__original_threadlocal__ = threading.local
except ImportError:
pass
def wrap_threading_local_with_coro_local():
"""monkey patch threading.local with something that is
greenlet aware. Since greenlets cannot cross threads,
so this should be semantically identical to threadlocal.local
"""
from eventlib import api
def get_ident():
return id(api.getcurrent())
class local(object):
def __init__(self):
self.__dict__['__objs'] = {}
def __getattr__(self, attr, g=get_ident):
try:
return self.__dict__['__objs'][g()][attr]
except KeyError:
raise AttributeError(
"No variable %s defined for the thread %s"
% (attr, g()))
def __setattr__(self, attr, value, g=get_ident):
self.__dict__['__objs'].setdefault(g(), {})[attr] = value
def __delattr__(self, attr, g=get_ident):
try:
del self.__dict__['__objs'][g()][attr]
except KeyError:
raise AttributeError(
"No variable %s defined for thread %s"
% (attr, g()))
threading.local = local
-def socket_bind_and_listen(descriptor, addr=('', 0), backlog=50):
- set_reuse_addr(descriptor)
- descriptor.bind(addr)
- descriptor.listen(backlog)
- return descriptor
+def socket_bind_and_listen(sock, addr=('', 0), backlog=50):
+ set_reuse_addr(sock)
+ sock.bind(addr)
+ sock.listen(backlog)
+ return sock
-def set_reuse_addr(descriptor):
+def set_reuse_addr(sock):
try:
- descriptor.setsockopt(
+ sock.setsockopt(
socket.SOL_SOCKET,
socket.SO_REUSEADDR,
- descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1,
+ sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1,
)
except socket.error:
pass

File Metadata

Mime Type
text/x-diff
Expires
Sat, Feb 1, 6:33 PM (1 d, 17 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3489340
Default Alt Text
(63 KB)

Event Timeline