diff --git a/eventlib/api.py b/eventlib/api.py index 6e1086e..2872de9 100644 --- a/eventlib/api.py +++ b/eventlib/api.py @@ -1,594 +1,594 @@ # @author Bob Ippolito # # Copyright (c) 2005-2006, Bob Ippolito # Copyright (c) 2007, Linden Research, Inc. # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import sys import socket import string import linecache import inspect import threading from eventlib.support import greenlets as greenlet __all__ = [ 'call_after', 'exc_after', 'getcurrent', 'get_default_hub', 'get_hub', 'GreenletExit', 'kill', 'sleep', 'spawn', 'spew', 'switch', 'ssl_listener', 'tcp_listener', 'tcp_server', 'trampoline', 'unspew', 'use_hub', 'with_timeout', 'timeout'] def switch(coro, result=None, exc=None): if exc is not None: return coro.throw(exc) return coro.switch(result) Greenlet = greenlet.greenlet class TimeoutError(Exception): """Exception raised if an asynchronous operation times out""" pass _threadlocal = threading.local() def tcp_listener(address, backlog=50): """ Listen on the given (ip, port) *address* with a TCP socket. Returns a socket object on which one should call ``accept()`` to accept a connection on the newly bound socket. Generally, the returned socket will be passed to ``tcp_server()``, which accepts connections forever and spawns greenlets for each incoming connection. """ from eventlib import greenio, util socket = greenio.GreenSocket(util.tcp_socket()) util.socket_bind_and_listen(socket, address, backlog=backlog) return socket def ssl_listener(address, certificate, private_key): """Listen on the given (ip, port) *address* with a TCP socket that can do SSL. *certificate* and *private_key* should be the filenames of the appropriate certificate and private key files to use with the SSL socket. Returns a socket object on which one should call ``accept()`` to accept a connection on the newly bound socket. Generally, the returned socket will be passed to ``tcp_server()``, which accepts connections forever and spawns greenlets for each incoming connection. """ from eventlib import greenio, util from eventlib.green import ssl socket = greenio.GreenSocket(util.tcp_socket()) socket = ssl.wrap_socket(socket, keyfile=private_key, certfile=certificate, server_side=True) util.socket_bind_and_listen(socket, address) socket.is_secure = True return socket def connect_tcp(address, localaddr=None): """ Create a TCP connection to address (host, port) and return the socket. Optionally, bind to localaddr (host, port) first. """ from eventlib import greenio, util desc = greenio.GreenSocket(util.tcp_socket()) if localaddr is not None: desc.bind(localaddr) desc.connect(address) return desc def tcp_server(listensocket, server, *args, **kw): """ Given a socket, accept connections forever, spawning greenlets and executing *server* for each new incoming connection. When *listensocket* is closed, the ``tcp_server()`` greenlet will end. listensocket The socket from which to accept connections. server The callable to call when a new connection is made. \*args The positional arguments to pass to *server*. \*\*kw The keyword arguments to pass to *server*. """ - print("tcpserver spawning %s on %s" % (server, listensocket.getsockname())) + print(("tcpserver spawning %s on %s" % (server, listensocket.getsockname()))) try: try: while True: spawn(server, listensocket.accept(), *args, **kw) except socket.error as e: # Broken pipe means it was shutdown - if e[0] != 32: + if e.errno != 32: raise finally: listensocket.close() def trampoline(fd, read=None, write=None, timeout=None, timeout_exc=TimeoutError): """Suspend the current coroutine until the given socket object or file descriptor is ready to *read*, ready to *write*, or the specified *timeout* elapses, depending on arguments specified. To wait for *fd* to be ready to read, pass *read* ``=True``; ready to write, pass *write* ``=True``. To specify a timeout, pass the *timeout* argument in seconds. If the specified *timeout* elapses before the socket is ready to read or write, *timeout_exc* will be raised instead of ``trampoline()`` returning normally. """ t = None hub = get_hub() current = greenlet.getcurrent() assert hub.greenlet is not current, 'do not call blocking functions from the mainloop' fileno = getattr(fd, 'fileno', lambda: fd)() def _do_close(_d, error=None): if error is None: current.throw(socket.error(32, 'Broken pipe')) else: current.throw(getattr(error, 'value', error)) # XXX convert to socket.error def cb(d): current.switch() # with TwistedHub, descriptor is actually an object (socket_rwdescriptor) which stores # this callback. If this callback stores a reference to the socket instance (fd) # then descriptor has a reference to that instance. This makes socket not collected # after greenlet exit. Since nobody actually uses the results of this switch, I removed # fd from here. If it will be needed than an indirect reference which is discarded right # after the switch above should be used. if timeout is not None: t = hub.schedule_call(timeout, current.throw, timeout_exc) try: descriptor = hub.add_descriptor(fileno, read and cb, write and cb, _do_close) try: return hub.switch() finally: hub.remove_descriptor(descriptor) finally: if t is not None: t.cancel() def get_fileno(obj): try: f = obj.fileno except AttributeError: assert isinstance(obj, int) return obj else: return f() def select(read_list, write_list, error_list, timeout=None): hub = get_hub() t = None current = greenlet.getcurrent() assert hub.greenlet is not current, 'do not call blocking functions from the mainloop' ds = {} for r in read_list: ds[get_fileno(r)] = {'read' : r} for w in write_list: ds.setdefault(get_fileno(w), {})['write'] = w for e in error_list: ds.setdefault(get_fileno(e), {})['error'] = e descriptors = [] def on_read(d): original = ds[get_fileno(d)]['read'] current.switch(([original], [], [])) def on_write(d): original = ds[get_fileno(d)]['write'] current.switch(([], [original], [])) def on_error(d, _err=None): original = ds[get_fileno(d)]['error'] current.switch(([], [], [original])) def on_timeout(): current.switch(([], [], [])) if timeout is not None: t = hub.schedule_call(timeout, on_timeout) try: - for k, v in ds.items(): + for k, v in list(ds.items()): d = hub.add_descriptor(k, v.get('read') is not None and on_read, v.get('write') is not None and on_write, v.get('error') is not None and on_error) descriptors.append(d) try: return hub.switch() finally: for d in descriptors: hub.remove_descriptor(d) finally: if t is not None: t.cancel() def _spawn_startup(cb, args, kw, cancel=None): try: greenlet.getcurrent().parent.switch() cancel = None finally: if cancel is not None: cancel() return cb(*args, **kw) def _spawn(g): g.parent = greenlet.getcurrent() g.switch() def spawn(function, *args, **kwds): """Create a new coroutine, or cooperative thread of control, within which to execute *function*. The *function* will be called with the given *args* and keyword arguments *kwds* and will remain in control unless it cooperatively yields by calling a socket method or ``sleep()``. ``spawn()`` returns control to the caller immediately, and *function* will be called in a future main loop iteration. An uncaught exception in *function* or any child will terminate the new coroutine with a log message. """ # killable t = None g = Greenlet(_spawn_startup) t = get_hub().schedule_call_global(0, _spawn, g) g.switch(function, args, kwds, t.cancel) return g def kill(g, *throw_args): get_hub().schedule_call(0, g.throw, *throw_args) if getcurrent() is not get_hub().greenlet: sleep(0) def call_after_global(seconds, function, *args, **kwds): """Schedule *function* to be called after *seconds* have elapsed. The function will be scheduled even if the current greenlet has exited. *seconds* may be specified as an integer, or a float if fractional seconds are desired. The *function* will be called with the given *args* and keyword arguments *kwds*, and will be executed within the main loop's coroutine. Its return value is discarded. Any uncaught exception will be logged. """ # cancellable def startup(): g = Greenlet(_spawn_startup) g.switch(function, args, kwds) g.switch() t = get_hub().schedule_call_global(seconds, startup) return t def call_after_local(seconds, function, *args, **kwds): """Schedule *function* to be called after *seconds* have elapsed. The function will NOT be called if the current greenlet has exited. *seconds* may be specified as an integer, or a float if fractional seconds are desired. The *function* will be called with the given *args* and keyword arguments *kwds*, and will be executed within the main loop's coroutine. Its return value is discarded. Any uncaught exception will be logged. """ # cancellable def startup(): g = Greenlet(_spawn_startup) g.switch(function, args, kwds) g.switch() t = get_hub().schedule_call_local(seconds, startup) return t # for compatibility with original eventlib API call_after = call_after_local class _SilentException: pass class FakeTimer: def cancel(self): pass class timeout: """Raise an exception in the block after timeout. with timeout(seconds[, exc]): ... code block ... Assuming code block is yielding (i.e. gives up control to the hub), an exception provided in `exc' argument will be raised (TimeoutError if `exc' is omitted). When exc is None, code block is interrupted silently. """ def __init__(self, seconds, *throw_args): self.seconds = seconds if seconds is None: return if not throw_args: self.throw_args = (TimeoutError(), ) elif throw_args == (None, ): self.throw_args = (_SilentException(), ) else: self.throw_args = throw_args def __enter__(self): if self.seconds is None: self.timer = FakeTimer() else: self.timer = exc_after(self.seconds, *self.throw_args) return self.timer def __exit__(self, typ, value, tb): self.timer.cancel() if typ is _SilentException and value in self.throw_args: return True def with_timeout(seconds, func, *args, **kwds): """Wrap a call to some (yielding) function with a timeout; if the called function fails to return before the timeout, cancel it and return a flag value. seconds (int or float) seconds before timeout occurs func the callable to execute with a timeout; must be one of the functions that implicitly or explicitly yields \*args, \*\*kwds (positional, keyword) arguments to pass to *func* timeout_value= value to return if timeout occurs (default raise ``TimeoutError``) **Returns**: Value returned by *func* if *func* returns before *seconds*, else *timeout_value* if provided, else raise ``TimeoutError`` **Raises**: Any exception raised by *func*, and ``TimeoutError`` if *func* times out and no ``timeout_value`` has been provided. **Example**:: data = with_timeout(30, httpc.get, 'http://www.google.com/', timeout_value="") Here *data* is either the result of the ``get()`` call, or the empty string if it took too long to return. Any exception raised by the ``get()`` call is passed through to the caller. """ # Recognize a specific keyword argument, while also allowing pass-through # of any other keyword arguments accepted by func. Use pop() so we don't # pass timeout_value through to func(). has_timeout_value = "timeout_value" in kwds timeout_value = kwds.pop("timeout_value", None) error = TimeoutError() timeout = exc_after(seconds, error) try: try: return func(*args, **kwds) except TimeoutError as ex: if ex is error and has_timeout_value: return timeout_value raise finally: timeout.cancel() def exc_after(seconds, *throw_args): """Schedule an exception to be raised into the current coroutine after *seconds* have elapsed. This only works if the current coroutine is yielding, and is generally used to set timeouts after which a network operation or series of operations will be canceled. Returns a timer object with a ``cancel()`` method which should be used to prevent the exception if the operation completes successfully. See also ``with_timeout()`` that encapsulates the idiom below. Example:: def read_with_timeout(): timer = api.exc_after(30, RuntimeError()) try: httpc.get('http://www.google.com/') except RuntimeError: print "Timed out!" else: timer.cancel() """ hub = get_hub() return call_after(seconds, getcurrent().throw, *throw_args) def get_default_hub(): """Select the default hub implementation based on what multiplexing libraries are installed. Tries twistedr if a twisted reactor is imported, then poll, then select. """ if 'twisted.internet.reactor' in sys.modules: from eventlib.hubs import twistedr return twistedr import select if hasattr(select, 'poll'): import eventlib.hubs.poll return eventlib.hubs.poll else: import eventlib.hubs.selects return eventlib.hubs.selects def use_hub(mod=None): """Use the module *mod*, containing a class called Hub, as the event hub. Usually not required; the default hub is usually fine. """ if mod is None: mod = get_default_hub() if hasattr(_threadlocal, 'hub'): del _threadlocal.hub if isinstance(mod, str): mod = __import__('eventlib.hubs.' + mod, globals(), locals(), ['Hub']) if hasattr(mod, 'Hub'): _threadlocal.Hub = mod.Hub else: _threadlocal.Hub = mod def get_hub(): """Get the current event hub singleton object. """ try: hub = _threadlocal.hub except AttributeError: try: _threadlocal.Hub except AttributeError: use_hub() hub = _threadlocal.hub = _threadlocal.Hub() return hub def sleep(seconds=0): """Yield control to another eligible coroutine until at least *seconds* have elapsed. *seconds* may be specified as an integer, or a float if fractional seconds are desired. Calling sleep with *seconds* of 0 is the canonical way of expressing a cooperative yield. For example, if one is looping over a large list performing an expensive calculation without calling any socket methods, it's a good idea to call ``sleep(0)`` occasionally; otherwise nothing else will run. """ hub = get_hub() assert hub.greenlet is not greenlet.getcurrent(), 'do not call blocking functions from the mainloop' timer = hub.schedule_call(seconds, greenlet.getcurrent().switch) try: hub.switch() finally: timer.cancel() getcurrent = greenlet.getcurrent GreenletExit = greenlet.GreenletExit class Spew(object): """ """ def __init__(self, trace_names=None, show_values=True): self.trace_names = trace_names self.show_values = show_values def __call__(self, frame, event, arg): if event == 'line': lineno = frame.f_lineno if '__file__' in frame.f_globals: filename = frame.f_globals['__file__'] if (filename.endswith('.pyc') or filename.endswith('.pyo')): filename = filename[:-1] name = frame.f_globals['__name__'] line = linecache.getline(filename, lineno) else: name = '[unknown]' try: src = inspect.getsourcelines(frame) line = src[lineno] except IOError: line = 'Unknown code named [%s]. VM instruction #%d' % ( frame.f_code.co_name, frame.f_lasti) if self.trace_names is None or name in self.trace_names: - print('%s:%s: %s' % (name, lineno, line.rstrip())) + print(('%s:%s: %s' % (name, lineno, line.rstrip()))) if not self.show_values: return self details = '\t' tokens = line.translate( string.maketrans(' ,.()', '\0' * 5)).split('\0') for tok in tokens: if tok in frame.f_globals: details += '%s=%r ' % (tok, frame.f_globals[tok]) if tok in frame.f_locals: details += '%s=%r ' % (tok, frame.f_locals[tok]) if details.strip(): print(details) return self def spew(trace_names=None, show_values=False): """Install a trace hook which writes incredibly detailed logs about what code is being executed to stdout. """ sys.settrace(Spew(trace_names, show_values)) def unspew(): """Remove the trace hook installed by spew. """ sys.settrace(None) def named(name): """Return an object given its name. The name uses a module-like syntax, eg:: os.path.join or:: mulib.mu.Resource """ toimport = name obj = None import_err_strings = [] while toimport: try: obj = __import__(toimport) break except ImportError as err: # print 'Import error on %s: %s' % (toimport, err) # debugging spam import_err_strings.append(err.__str__()) toimport = '.'.join(toimport.split('.')[:-1]) if obj is None: raise ImportError('%s could not be imported. Import errors: %r' % (name, import_err_strings)) for seg in name.split('.')[1:]: try: obj = getattr(obj, seg) except AttributeError: dirobj = dir(obj) dirobj.sort() raise AttributeError('attribute %r missing from %r (%r) %r. Import errors: %r' % ( seg, obj, dirobj, name, import_err_strings)) return obj diff --git a/eventlib/backdoor.py b/eventlib/backdoor.py index e9c4742..8349541 100644 --- a/eventlib/backdoor.py +++ b/eventlib/backdoor.py @@ -1,128 +1,128 @@ # @author Bob Ippolito # # Copyright (c) 2005-2006, Bob Ippolito # Copyright (c) 2007, Linden Research, Inc. # Copyright (c) 2008, Donovan Preston # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import socket import sys from code import InteractiveConsole from eventlib import api from eventlib.support import greenlets try: sys.ps1 except AttributeError: sys.ps1 = '>>> ' try: sys.ps2 except AttributeError: sys.ps2 = '... ' class SocketConsole(greenlets.greenlet): def __init__(self, desc, hostport, locals): self.hostport = hostport self.locals = locals # mangle the socket self.desc = desc readline = desc.readline self.old = {} self.fixups = { 'softspace': 0, 'isatty': lambda: True, 'flush': lambda: None, 'readline': lambda *a: readline(*a).replace('\r\n', '\n'), } for key, value in self.fixups.items(): if hasattr(desc, key): self.old[key] = getattr(desc, key) setattr(desc, key, value) greenlets.greenlet.__init__(self) def run(self): try: console = InteractiveConsole(self.locals) console.interact() finally: self.switch_out() self.finalize() def switch(self, *args, **kw): self.saved = sys.stdin, sys.stderr, sys.stdout sys.stdin = sys.stdout = sys.stderr = self.desc greenlets.greenlet.switch(self, *args, **kw) def switch_out(self): sys.stdin, sys.stderr, sys.stdout = self.saved def finalize(self): # restore the state of the socket for key in self.fixups: try: value = self.old[key] except KeyError: delattr(self.desc, key) else: setattr(self.desc, key, value) self.fixups.clear() self.old.clear() self.desc = None print("backdoor closed to %s:%s" % self.hostport) def backdoor_server(server, locals=None): print("backdoor listening on %s:%s" % server.getsockname()) try: try: while True: (conn, (host, port)) = server.accept() print("backdoor connected to %s:%s" % (host, port)) fl = conn.makeGreenFile("rw") fl.newlines = '\n' greenlet = SocketConsole(fl, (host, port), locals) hub = api.get_hub() hub.schedule_call_global(0, greenlet.switch) except socket.error as e: # Broken pipe means it was shutdown if e[0] != 32: raise finally: server.close() -def backdoor(xxx_todo_changeme, locals=None): +def backdoor(connection, locals=None): """ Use this with tcp_server like so: api.tcp_server( api.tcp_listener(('127.0.0.1', 9000)), backdoor.backdoor, {}) """ - (conn, addr) = xxx_todo_changeme + (conn, addr) = connection host, port = addr print("backdoor to %s:%s" % (host, port)) fl = conn.makeGreenFile("rw") fl.newlines = '\n' greenlet = SocketConsole(fl, (host, port), locals) hub = api.get_hub() hub.schedule_call_global(0, greenlet.switch) diff --git a/eventlib/green/BaseHTTPServer.py b/eventlib/green/BaseHTTPServer.py index 18afed2..36a2fe4 100644 --- a/eventlib/green/BaseHTTPServer.py +++ b/eventlib/green/BaseHTTPServer.py @@ -1,54 +1,51 @@ import sys from eventlib.green import socket from eventlib.green import SocketServer -__import_lst = ['DEFAULT_ERROR_MESSAGE', '_quote_html', '__version__', '__all__', 'BaseHTTPRequestHandler'] -__BaseHTTPServer = __import__('BaseHTTPServer') -for var in __import_lst: - exec("%s = __BaseHTTPServer.%s" % (var, var)) +from http.server import( DEFAULT_ERROR_MESSAGE, __version__, __all__, BaseHTTPRequestHandler) class HTTPServer(SocketServer.TCPServer): allow_reuse_address = 1 # Seems to make sense in testing environment def server_bind(self): """Override server_bind to store the server name.""" SocketServer.TCPServer.server_bind(self) host, port = self.socket.getsockname()[:2] self.server_name = socket.getfqdn(host) self.server_port = port class BaseHTTPRequestHandler(BaseHTTPRequestHandler): def address_string(self): host, port = self.client_address[:2] return socket.getfqdn(host) def test(HandlerClass = BaseHTTPRequestHandler, ServerClass = HTTPServer, protocol="HTTP/1.0"): """Test the HTTP request handler class. This runs an HTTP server on port 8000 (or the first command line argument). """ if sys.argv[1:]: port = int(sys.argv[1]) else: port = 8000 server_address = ('', port) HandlerClass.protocol_version = protocol httpd = ServerClass(server_address, HandlerClass) sa = httpd.socket.getsockname() print("Serving HTTP on", sa[0], "port", sa[1], "...") httpd.serve_forever() if __name__ == '__main__': test() diff --git a/eventlib/green/SocketServer.py b/eventlib/green/SocketServer.py index 41052e7..8b0f887 100644 --- a/eventlib/green/SocketServer.py +++ b/eventlib/green/SocketServer.py @@ -1,59 +1,55 @@ -__import_lst = ['__all__', '__version__', 'BaseServer', 'TCPServer', 'UDPServer', 'ForkingMixIn', - 'ThreadingMixIn', 'BaseRequestHandler', 'StreamRequestHandler', 'DatagramRequestHandler'] -__SocketServer = __import__('SocketServer') -for var in __import_lst: - exec("%s = __SocketServer.%s" % (var, var)) +from socketserver import( __all__, __version__, BaseServer, TCPServer, UDPServer, ForkingMixIn, ThreadingMixIn, BaseRequestHandler, StreamRequestHandler, DatagramRequestHandler ) # QQQ ForkingMixIn should be fixed to use green waitpid? from eventlib.green import socket class TCPServer(TCPServer): def __init__(self, server_address, RequestHandlerClass): """Constructor. May be extended, do not override.""" BaseServer.__init__(self, server_address, RequestHandlerClass) self.socket = socket.socket(self.address_family, self.socket_type) self.server_bind() self.server_activate() class UDPServer(UDPServer): def __init__(self, server_address, RequestHandlerClass): """Constructor. May be extended, do not override.""" BaseServer.__init__(self, server_address, RequestHandlerClass) self.socket = socket.socket(self.address_family, self.socket_type) self.server_bind() self.server_activate() class ThreadingMixIn(ThreadingMixIn): def process_request(self, request, client_address): """Start a new thread to process the request.""" from eventlib.green import threading t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) if self.daemon_threads: t.setDaemon (1) t.start() class ForkingUDPServer(ForkingMixIn, UDPServer): pass class ForkingTCPServer(ForkingMixIn, TCPServer): pass class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass if hasattr(socket, 'AF_UNIX'): class UnixStreamServer(TCPServer): address_family = socket.AF_UNIX class UnixDatagramServer(UDPServer): address_family = socket.AF_UNIX class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass diff --git a/eventlib/green/httplib.py b/eventlib/green/httplib.py index 1c80022..e3beee1 100644 --- a/eventlib/green/httplib.py +++ b/eventlib/green/httplib.py @@ -1,1410 +1,1410 @@ """HTTP/1.1 client library HTTPConnection goes through a number of "states", which define when a client may legally make another request or fetch the response for a particular request. This diagram details these state transitions: (null) | | HTTPConnection() v Idle | | putrequest() v Request-started | | ( putheader() )* endheaders() v Request-sent | | response = getresponse() v Unread-response [Response-headers-read] |\____________________ | | | response.read() | putrequest() v v Idle Req-started-unread-response ______/| / | response.read() | | ( putheader() )* endheaders() v v Request-started Req-sent-unread-response | | response.read() v Request-sent This diagram presents the following rules: -- a second request may not be started until {response-headers-read} -- a response [object] cannot be retrieved until {request-sent} -- there is no differentiation between an unread response body and a partially read response body Note: this enforcement is applied by the HTTPConnection class. The HTTPResponse class does not enforce this state machine, which implies sophisticated clients may accelerate the request/response pipeline. Caution should be taken, though: accelerating the states beyond the above pattern may imply knowledge of the server's connection-close behavior for certain requests. For example, it is impossible to tell whether the server will close the connection UNTIL the response headers have been read; this means that further requests cannot be placed into the pipeline until it is known that the server will NOT be closing the connection. Logical State __state __response ------------- ------- ---------- Idle _CS_IDLE None Request-started _CS_REQ_STARTED None Request-sent _CS_REQ_SENT None Unread-response _CS_IDLE Req-started-unread-response _CS_REQ_STARTED Req-sent-unread-response _CS_REQ_SENT """ import errno -import mimetools +from email.message import Message from eventlib.green import socket from urllib.parse import urlsplit try: from io import StringIO except ImportError: from io import StringIO __all__ = ["HTTP", "HTTPResponse", "HTTPConnection", "HTTPSConnection", "HTTPException", "NotConnected", "UnknownProtocol", "UnknownTransferEncoding", "UnimplementedFileMode", "IncompleteRead", "InvalidURL", "ImproperConnectionState", "CannotSendRequest", "CannotSendHeader", "ResponseNotReady", "BadStatusLine", "error", "responses"] HTTP_PORT = 80 HTTPS_PORT = 443 _UNKNOWN = 'UNKNOWN' # connection states _CS_IDLE = 'Idle' _CS_REQ_STARTED = 'Request-started' _CS_REQ_SENT = 'Request-sent' # status codes # informational CONTINUE = 100 SWITCHING_PROTOCOLS = 101 PROCESSING = 102 # successful OK = 200 CREATED = 201 ACCEPTED = 202 NON_AUTHORITATIVE_INFORMATION = 203 NO_CONTENT = 204 RESET_CONTENT = 205 PARTIAL_CONTENT = 206 MULTI_STATUS = 207 IM_USED = 226 # redirection MULTIPLE_CHOICES = 300 MOVED_PERMANENTLY = 301 FOUND = 302 SEE_OTHER = 303 NOT_MODIFIED = 304 USE_PROXY = 305 TEMPORARY_REDIRECT = 307 # client error BAD_REQUEST = 400 UNAUTHORIZED = 401 PAYMENT_REQUIRED = 402 FORBIDDEN = 403 NOT_FOUND = 404 METHOD_NOT_ALLOWED = 405 NOT_ACCEPTABLE = 406 PROXY_AUTHENTICATION_REQUIRED = 407 REQUEST_TIMEOUT = 408 CONFLICT = 409 GONE = 410 LENGTH_REQUIRED = 411 PRECONDITION_FAILED = 412 REQUEST_ENTITY_TOO_LARGE = 413 REQUEST_URI_TOO_LONG = 414 UNSUPPORTED_MEDIA_TYPE = 415 REQUESTED_RANGE_NOT_SATISFIABLE = 416 EXPECTATION_FAILED = 417 UNPROCESSABLE_ENTITY = 422 LOCKED = 423 FAILED_DEPENDENCY = 424 UPGRADE_REQUIRED = 426 # server error INTERNAL_SERVER_ERROR = 500 NOT_IMPLEMENTED = 501 BAD_GATEWAY = 502 SERVICE_UNAVAILABLE = 503 GATEWAY_TIMEOUT = 504 HTTP_VERSION_NOT_SUPPORTED = 505 INSUFFICIENT_STORAGE = 507 NOT_EXTENDED = 510 # Mapping status codes to official W3C names responses = { 100: 'Continue', 101: 'Switching Protocols', 200: 'OK', 201: 'Created', 202: 'Accepted', 203: 'Non-Authoritative Information', 204: 'No Content', 205: 'Reset Content', 206: 'Partial Content', 300: 'Multiple Choices', 301: 'Moved Permanently', 302: 'Found', 303: 'See Other', 304: 'Not Modified', 305: 'Use Proxy', 306: '(Unused)', 307: 'Temporary Redirect', 400: 'Bad Request', 401: 'Unauthorized', 402: 'Payment Required', 403: 'Forbidden', 404: 'Not Found', 405: 'Method Not Allowed', 406: 'Not Acceptable', 407: 'Proxy Authentication Required', 408: 'Request Timeout', 409: 'Conflict', 410: 'Gone', 411: 'Length Required', 412: 'Precondition Failed', 413: 'Request Entity Too Large', 414: 'Request-URI Too Long', 415: 'Unsupported Media Type', 416: 'Requested Range Not Satisfiable', 417: 'Expectation Failed', 500: 'Internal Server Error', 501: 'Not Implemented', 502: 'Bad Gateway', 503: 'Service Unavailable', 504: 'Gateway Timeout', 505: 'HTTP Version Not Supported', } # maximal amount of data to read at one time in _safe_read MAXAMOUNT = 1048576 -class HTTPMessage(mimetools.Message): +class HTTPMessage(Message): def addheader(self, key, value): """Add header for field key handling repeats.""" prev = self.dict.get(key) if prev is None: self.dict[key] = value else: combined = ", ".join((prev, value)) self.dict[key] = combined def addcontinue(self, key, more): """Add more field data from a continuation line.""" prev = self.dict[key] self.dict[key] = prev + "\n " + more def readheaders(self): """Read header lines. Read header lines up to the entirely blank line that terminates them. The (normally blank) line that ends the headers is skipped, but not included in the returned list. If a non-header line ends the headers, (which is an error), an attempt is made to backspace over it; it is never included in the returned list. The variable self.status is set to the empty string if all went well, otherwise it is an error message. The variable self.headers is a completely uninterpreted list of lines contained in the header (so printing them will reproduce the header exactly as it appears in the file). If multiple header fields with the same name occur, they are combined according to the rules in RFC 2616 sec 4.2: Appending each subsequent field-value to the first, each separated by a comma. The order in which header fields with the same field-name are received is significant to the interpretation of the combined field value. """ # XXX The implementation overrides the readheaders() method of # rfc822.Message. The base class design isn't amenable to # customized behavior here so the method here is a copy of the # base class code with a few small changes. self.dict = {} self.unixfrom = '' self.headers = hlist = [] self.status = '' headerseen = "" firstline = 1 startofline = unread = tell = None if hasattr(self.fp, 'unread'): unread = self.fp.unread elif self.seekable: tell = self.fp.tell while True: if tell: try: startofline = tell() except IOError: startofline = tell = None self.seekable = 0 line = self.fp.readline() if not line: self.status = 'EOF in headers' break # Skip unix From name time lines if firstline and line.startswith('From '): self.unixfrom = self.unixfrom + line continue firstline = 0 if headerseen and line[0] in ' \t': # XXX Not sure if continuation lines are handled properly # for http and/or for repeating headers # It's a continuation line. hlist.append(line) self.addcontinue(headerseen, line.strip()) continue elif self.iscomment(line): # It's a comment. Ignore it. continue elif self.islast(line): # Note! No pushback here! The delimiter line gets eaten. break headerseen = self.isheader(line) if headerseen: # It's a legal header line, save it. hlist.append(line) self.addheader(headerseen, line[len(headerseen)+1:].strip()) continue else: # It's not a header line; throw it back and stop here. if not self.dict: self.status = 'No headers' else: self.status = 'Non-header line where header expected' # Try to undo the read. if unread: unread(line) elif tell: self.fp.seek(startofline) else: self.status = self.status + '; bad seek' break class HTTPResponse: # strict: If true, raise BadStatusLine if the status line can't be # parsed as a valid HTTP/1.0 or 1.1 status line. By default it is # false because it prevents clients from talking to HTTP/0.9 # servers. Note that a response with a sufficiently corrupted # status line will look like an HTTP/0.9 response. # See RFC 2616 sec 19.6 and RFC 1945 sec 6 for details. def __init__(self, sock, debuglevel=0, strict=0, method=None): self.fp = sock.makefile('rb', 0) self.debuglevel = debuglevel self.strict = strict self._method = method self.msg = None # from the Status-Line of the response self.version = _UNKNOWN # HTTP-Version self.status = _UNKNOWN # Status-Code self.reason = _UNKNOWN # Reason-Phrase self.chunked = _UNKNOWN # is "chunked" being used? self.chunk_left = _UNKNOWN # bytes left to read in current chunk self.length = _UNKNOWN # number of bytes left in response self.will_close = _UNKNOWN # conn will close at end of response def _read_status(self): # Initialize with Simple-Response defaults line = self.fp.readline() if self.debuglevel > 0: print("reply:", repr(line)) if not line: # Presumably, the server closed the connection before # sending a valid response. raise BadStatusLine(line) try: [version, status, reason] = line.split(None, 2) except ValueError: try: [version, status] = line.split(None, 1) reason = "" except ValueError: # empty version will cause next test to fail and status # will be treated as 0.9 response. version = "" if not version.startswith('HTTP/'): if self.strict: self.close() raise BadStatusLine(line) else: # assume it's a Simple-Response from an 0.9 server self.fp = LineAndFileWrapper(line, self.fp) return "HTTP/0.9", 200, "" # The status code is a three-digit number try: status = int(status) if status < 100 or status > 999: raise BadStatusLine(line) except ValueError: raise BadStatusLine(line) return version, status, reason def begin(self): if self.msg is not None: # we've already started reading the response return # read until we get a non-100 response while True: version, status, reason = self._read_status() if status != CONTINUE: break # skip the header from the 100 response while True: skip = self.fp.readline().strip() if not skip: break if self.debuglevel > 0: print("header:", skip) self.status = status self.reason = reason.strip() if version == 'HTTP/1.0': self.version = 10 elif version.startswith('HTTP/1.'): self.version = 11 # use HTTP/1.1 code for HTTP/1.x where x>=1 elif version == 'HTTP/0.9': self.version = 9 else: raise UnknownProtocol(version) if self.version == 9: self.length = None self.chunked = 0 self.will_close = 1 self.msg = HTTPMessage(StringIO()) return self.msg = HTTPMessage(self.fp, 0) if self.debuglevel > 0: for hdr in self.msg.headers: print("header:", hdr, end=' ') # don't let the msg keep an fp self.msg.fp = None # are we using the chunked-style of transfer encoding? tr_enc = self.msg.getheader('transfer-encoding') if tr_enc and tr_enc.lower() == "chunked": self.chunked = 1 self.chunk_left = None else: self.chunked = 0 # will the connection close at the end of the response? self.will_close = self._check_close() # do we have a Content-Length? # NOTE: RFC 2616, S4.4, #3 says we ignore this if tr_enc is "chunked" length = self.msg.getheader('content-length') if length and not self.chunked: try: self.length = int(length) except ValueError: self.length = None else: self.length = None # does the body have a fixed length? (of zero) if (status == NO_CONTENT or status == NOT_MODIFIED or 100 <= status < 200 or # 1xx codes self._method == 'HEAD'): self.length = 0 # if the connection remains open, and we aren't using chunked, and # a content-length was not provided, then assume that the connection # WILL close. if not self.will_close and \ not self.chunked and \ self.length is None: self.will_close = 1 def _check_close(self): conn = self.msg.getheader('connection') if self.version == 11: # An HTTP/1.1 proxy is assumed to stay open unless # explicitly closed. conn = self.msg.getheader('connection') if conn and "close" in conn.lower(): return True return False # Some HTTP/1.0 implementations have support for persistent # connections, using rules different than HTTP/1.1. # For older HTTP, Keep-Alive indiciates persistent connection. if self.msg.getheader('keep-alive'): return False # At least Akamai returns a "Connection: Keep-Alive" header, # which was supposed to be sent by the client. if conn and "keep-alive" in conn.lower(): return False # Proxy-Connection is a netscape hack. pconn = self.msg.getheader('proxy-connection') if pconn and "keep-alive" in pconn.lower(): return False # otherwise, assume it will close return True def close(self): if self.fp: self.fp.close() self.fp = None def isclosed(self): # NOTE: it is possible that we will not ever call self.close(). This # case occurs when will_close is TRUE, length is None, and we # read up to the last byte, but NOT past it. # # IMPLIES: if will_close is FALSE, then self.close() will ALWAYS be # called, meaning self.isclosed() is meaningful. return self.fp is None # XXX It would be nice to have readline and __iter__ for this, too. def read(self, amt=None): if self.fp is None: return '' if self.chunked: return self._read_chunked(amt) if amt is None: # unbounded read if self.length is None: s = self.fp.read() else: s = self._safe_read(self.length) self.length = 0 self.close() # we read everything return s if self.length is not None: if amt > self.length: # clip the read to the "end of response" amt = self.length # we do not use _safe_read() here because this may be a .will_close # connection, and the user is reading more bytes than will be provided # (for example, reading in 1k chunks) s = self.fp.read(amt) if self.length is not None: self.length -= len(s) return s def _read_chunked(self, amt): assert self.chunked != _UNKNOWN chunk_left = self.chunk_left value = '' # XXX This accumulates chunks by repeated string concatenation, # which is not efficient as the number or size of chunks gets big. while True: if chunk_left is None: line = self.fp.readline() i = line.find(';') if i >= 0: line = line[:i] # strip chunk-extensions chunk_left = int(line, 16) if chunk_left == 0: break if amt is None: value += self._safe_read(chunk_left) elif amt < chunk_left: value += self._safe_read(amt) self.chunk_left = chunk_left - amt return value elif amt == chunk_left: value += self._safe_read(amt) self._safe_read(2) # toss the CRLF at the end of the chunk self.chunk_left = None return value else: value += self._safe_read(chunk_left) amt -= chunk_left # we read the whole chunk, get another self._safe_read(2) # toss the CRLF at the end of the chunk chunk_left = None # read and discard trailer up to the CRLF terminator ### note: we shouldn't have any trailers! while True: line = self.fp.readline() if not line: # a vanishingly small number of sites EOF without # sending the trailer break if line == '\r\n': break # we read everything; close the "file" self.close() return value def _safe_read(self, amt): """Read the number of bytes requested, compensating for partial reads. Normally, we have a blocking socket, but a read() can be interrupted by a signal (resulting in a partial read). Note that we cannot distinguish between EOF and an interrupt when zero bytes have been read. IncompleteRead() will be raised in this situation. This function should be used when bytes "should" be present for reading. If the bytes are truly not available (due to EOF), then the IncompleteRead exception can be used to detect the problem. """ s = [] while amt > 0: chunk = self.fp.read(min(amt, MAXAMOUNT)) if not chunk: raise IncompleteRead(s) s.append(chunk) amt -= len(chunk) return ''.join(s) def getheader(self, name, default=None): if self.msg is None: raise ResponseNotReady() return self.msg.getheader(name, default) def getheaders(self): """Return list of (header, value) tuples.""" if self.msg is None: raise ResponseNotReady() return list(self.msg.items()) class HTTPConnection: _http_vsn = 11 _http_vsn_str = 'HTTP/1.1' response_class = HTTPResponse default_port = HTTP_PORT auto_open = 1 debuglevel = 0 strict = 0 def __init__(self, host, port=None, strict=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): self.sock = None self.timeout = timeout self._buffer = [] self.__response = None self.__state = _CS_IDLE self._method = None self._set_hostport(host, port) if strict is not None: self.strict = strict def _set_hostport(self, host, port): if port is None: i = host.rfind(':') j = host.rfind(']') # ipv6 addresses have [...] if i > j: try: port = int(host[i+1:]) except ValueError: raise InvalidURL("nonnumeric port: '%s'" % host[i+1:]) host = host[:i] else: port = self.default_port if host and host[0] == '[' and host[-1] == ']': host = host[1:-1] self.host = host self.port = port def set_debuglevel(self, level): self.debuglevel = level def connect(self): self.sock = socket.create_connection((self.host, self.port), self.timeout) def close(self): """Close the connection to the HTTP server.""" if self.sock: self.sock.close() # close it manually... there may be other refs self.sock = None if self.__response: self.__response.close() self.__response = None self.__state = _CS_IDLE def send(self, str): """Send `str' to the server.""" if self.sock is None: if self.auto_open: self.connect() else: raise NotConnected() # send the data to the server. if we get a broken pipe, then close # the socket. we want to reconnect when somebody tries to send again. # # NOTE: we DO propagate the error, though, because we cannot simply # ignore the error... the caller will know if they can retry. if self.debuglevel > 0: print("send:", repr(str)) try: self.sock.sendall(str) except socket.error as v: if v[0] == 32: # Broken pipe self.close() raise def _output(self, s): """Add a line of output to the current request buffer. Assumes that the line does *not* end with \\r\\n. """ self._buffer.append(s) def _send_output(self): """Send the currently buffered request and clear the buffer. Appends an extra \\r\\n to the buffer. """ self._buffer.extend(("", "")) msg = "\r\n".join(self._buffer) del self._buffer[:] self.send(msg) def putrequest(self, method, url, skip_host=0, skip_accept_encoding=0): """Send a request to the server. `method' specifies an HTTP request method, e.g. 'GET'. `url' specifies the object being requested, e.g. '/index.html'. `skip_host' if True does not add automatically a 'Host:' header `skip_accept_encoding' if True does not add automatically an 'Accept-Encoding:' header """ # if a prior response has been completed, then forget about it. if self.__response and self.__response.isclosed(): self.__response = None # in certain cases, we cannot issue another request on this connection. # this occurs when: # 1) we are in the process of sending a request. (_CS_REQ_STARTED) # 2) a response to a previous request has signalled that it is going # to close the connection upon completion. # 3) the headers for the previous response have not been read, thus # we cannot determine whether point (2) is true. (_CS_REQ_SENT) # # if there is no prior response, then we can request at will. # # if point (2) is true, then we will have passed the socket to the # response (effectively meaning, "there is no prior response"), and # will open a new one when a new request is made. # # Note: if a prior response exists, then we *can* start a new request. # We are not allowed to begin fetching the response to this new # request, however, until that prior response is complete. # if self.__state == _CS_IDLE: self.__state = _CS_REQ_STARTED else: raise CannotSendRequest() # Save the method we use, we need it later in the response phase self._method = method if not url: url = '/' str = '%s %s %s' % (method, url, self._http_vsn_str) self._output(str) if self._http_vsn == 11: # Issue some standard headers for better HTTP/1.1 compliance if not skip_host: # this header is issued *only* for HTTP/1.1 # connections. more specifically, this means it is # only issued when the client uses the new # HTTPConnection() class. backwards-compat clients # will be using HTTP/1.0 and those clients may be # issuing this header themselves. we should NOT issue # it twice; some web servers (such as Apache) barf # when they see two Host: headers # If we need a non-standard port,include it in the # header. If the request is going through a proxy, # but the host of the actual URL, not the host of the # proxy. netloc = '' if url.startswith('http'): nil, netloc, nil, nil, nil = urlsplit(url) if netloc: try: netloc_enc = netloc.encode("ascii") except UnicodeEncodeError: netloc_enc = netloc.encode("idna") self.putheader('Host', netloc_enc) else: try: host_enc = self.host.encode("ascii") except UnicodeEncodeError: host_enc = self.host.encode("idna") if self.port == HTTP_PORT: self.putheader('Host', host_enc) else: self.putheader('Host', "%s:%s" % (host_enc, self.port)) # note: we are assuming that clients will not attempt to set these # headers since *this* library must deal with the # consequences. this also means that when the supporting # libraries are updated to recognize other forms, then this # code should be changed (removed or updated). # we only want a Content-Encoding of "identity" since we don't # support encodings such as x-gzip or x-deflate. if not skip_accept_encoding: self.putheader('Accept-Encoding', 'identity') # we can accept "chunked" Transfer-Encodings, but no others # NOTE: no TE header implies *only* "chunked" #self.putheader('TE', 'chunked') # if TE is supplied in the header, then it must appear in a # Connection header. #self.putheader('Connection', 'TE') else: # For HTTP/1.0, the server will assume "not chunked" pass def putheader(self, header, value): """Send a request header line to the server. For example: h.putheader('Accept', 'text/html') """ if self.__state != _CS_REQ_STARTED: raise CannotSendHeader() str = '%s: %s' % (header, value) self._output(str) def endheaders(self): """Indicate that the last header line has been sent to the server.""" if self.__state == _CS_REQ_STARTED: self.__state = _CS_REQ_SENT else: raise CannotSendHeader() self._send_output() def request(self, method, url, body=None, headers={}): """Send a complete request to the server.""" try: self._send_request(method, url, body, headers) except socket.error as v: # trap 'Broken pipe' if we're allowed to automatically reconnect if v.args[0] != 32 or not self.auto_open: raise # try one more time self._send_request(method, url, body, headers) def _send_request(self, method, url, body, headers): # honour explicitly requested Host: and Accept-Encoding headers header_names = dict.fromkeys([k.lower() for k in headers]) skips = {} if 'host' in header_names: skips['skip_host'] = 1 if 'accept-encoding' in header_names: skips['skip_accept_encoding'] = 1 self.putrequest(method, url, **skips) if body and ('content-length' not in header_names): self.putheader('Content-Length', str(len(body))) for hdr, value in headers.items(): self.putheader(hdr, value) self.endheaders() if body: self.send(body) def getresponse(self): "Get the response from the server." # if a prior response has been completed, then forget about it. if self.__response and self.__response.isclosed(): self.__response = None # # if a prior response exists, then it must be completed (otherwise, we # cannot read this response's header to determine the connection-close # behavior) # # note: if a prior response existed, but was connection-close, then the # socket and response were made independent of this HTTPConnection # object since a new request requires that we open a whole new # connection # # this means the prior response had one of two states: # 1) will_close: this connection was reset and the prior socket and # response operate independently # 2) persistent: the response was retained and we await its # isclosed() status to become true. # if self.__state != _CS_REQ_SENT or self.__response: raise ResponseNotReady() if self.debuglevel > 0: response = self.response_class(self.sock, self.debuglevel, strict=self.strict, method=self._method) else: response = self.response_class(self.sock, strict=self.strict, method=self._method) response.begin() assert response.will_close != _UNKNOWN self.__state = _CS_IDLE if response.will_close: # this effectively passes the connection to the response self.close() else: # remember this, so we can tell when it is complete self.__response = response return response # The next several classes are used to define FakeSocket, a socket-like # interface to an SSL connection. # The primary complexity comes from faking a makefile() method. The # standard socket makefile() implementation calls dup() on the socket # file descriptor. As a consequence, clients can call close() on the # parent socket and its makefile children in any order. The underlying # socket isn't closed until they are all closed. # The implementation uses reference counting to keep the socket open # until the last client calls close(). SharedSocket keeps track of # the reference counting and SharedSocketClient provides an constructor # and close() method that call incref() and decref() correctly. class SharedSocket: def __init__(self, sock): self.sock = sock self._refcnt = 0 def incref(self): self._refcnt += 1 def decref(self): self._refcnt -= 1 assert self._refcnt >= 0 if self._refcnt == 0: self.sock.close() def __del__(self): self.sock.close() class SharedSocketClient: def __init__(self, shared): self._closed = 0 self._shared = shared self._shared.incref() self._sock = shared.sock def close(self): if not self._closed: self._shared.decref() self._closed = 1 self._shared = None class SSLFile(SharedSocketClient): """File-like object wrapping an SSL socket.""" BUFSIZE = 8192 def __init__(self, sock, ssl, bufsize=None): SharedSocketClient.__init__(self, sock) self._ssl = ssl self._buf = '' self._bufsize = bufsize or self.__class__.BUFSIZE def _read(self): buf = '' # put in a loop so that we retry on transient errors while True: try: buf = self._ssl.read(self._bufsize) except socket.sslerror as err: if (err[0] == socket.SSL_ERROR_WANT_READ or err[0] == socket.SSL_ERROR_WANT_WRITE): continue if (err[0] == socket.SSL_ERROR_ZERO_RETURN or err[0] == socket.SSL_ERROR_EOF): break raise except socket.error as err: if err[0] == errno.EINTR: continue if err[0] == errno.EBADF: # XXX socket was closed? break raise else: break return buf def read(self, size=None): L = [self._buf] avail = len(self._buf) while size is None or avail < size: s = self._read() if s == '': break L.append(s) avail += len(s) all = "".join(L) if size is None: self._buf = '' return all else: self._buf = all[size:] return all[:size] def readline(self): L = [self._buf] self._buf = '' while 1: i = L[-1].find("\n") if i >= 0: break s = self._read() if s == '': break L.append(s) if i == -1: # loop exited because there is no more data return "".join(L) else: all = "".join(L) # XXX could do enough bookkeeping not to do a 2nd search i = all.find("\n") + 1 line = all[:i] self._buf = all[i:] return line def readlines(self, sizehint=0): total = 0 list = [] while True: line = self.readline() if not line: break list.append(line) total += len(line) if sizehint and total >= sizehint: break return list def fileno(self): return self._sock.fileno() def __iter__(self): return self def __next__(self): line = self.readline() if not line: raise StopIteration return line class FakeSocket(SharedSocketClient): class _closedsocket: def __getattr__(self, name): raise error(9, 'Bad file descriptor') def __init__(self, sock, ssl): sock = SharedSocket(sock) SharedSocketClient.__init__(self, sock) self._ssl = ssl def close(self): SharedSocketClient.close(self) self._sock = self.__class__._closedsocket() def makefile(self, mode, bufsize=None): if mode != 'r' and mode != 'rb': raise UnimplementedFileMode() return SSLFile(self._shared, self._ssl, bufsize) def send(self, stuff, flags = 0): return self._ssl.write(stuff) sendall = send def recv(self, len = 1024, flags = 0): return self._ssl.read(len) def __getattr__(self, attr): return getattr(self._sock, attr) class HTTPSConnection(HTTPConnection): "This class allows communication via SSL." default_port = HTTPS_PORT def __init__(self, host, port=None, key_file=None, cert_file=None, strict=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): HTTPConnection.__init__(self, host, port, strict, timeout) self.key_file = key_file self.cert_file = cert_file def connect(self): sock = socket.create_connection((self.host, self.port), self.timeout) ssl = socket.ssl(sock, self.key_file, self.cert_file) self.sock = FakeSocket(sock, ssl) class HTTP: "Compatibility class with httplib.py from 1.5." _http_vsn = 10 _http_vsn_str = 'HTTP/1.0' debuglevel = 0 _connection_class = HTTPConnection def __init__(self, host='', port=None, strict=None): "Provide a default host, since the superclass requires one." # some joker passed 0 explicitly, meaning default port if port == 0: port = None # Note that we may pass an empty string as the host; this will throw # an error when we attempt to connect. Presumably, the client code # will call connect before then, with a proper host. self._setup(self._connection_class(host, port, strict)) def _setup(self, conn): self._conn = conn # set up delegation to flesh out interface self.send = conn.send self.putrequest = conn.putrequest self.endheaders = conn.endheaders self.set_debuglevel = conn.set_debuglevel conn._http_vsn = self._http_vsn conn._http_vsn_str = self._http_vsn_str self.file = None def connect(self, host=None, port=None): "Accept arguments to set the host/port, since the superclass doesn't." if host is not None: self._conn._set_hostport(host, port) self._conn.connect() def getfile(self): "Provide a getfile, since the superclass' does not use this concept." return self.file def putheader(self, header, *values): "The superclass allows only one value argument." self._conn.putheader(header, '\r\n\t'.join(values)) def getreply(self): """Compat definition since superclass does not define it. Returns a tuple consisting of: - server status code (e.g. '200' if all goes well) - server "reason" corresponding to status code - any RFC822 headers in the response from the server """ try: response = self._conn.getresponse() except BadStatusLine as e: ### hmm. if getresponse() ever closes the socket on a bad request, ### then we are going to have problems with self.sock ### should we keep this behavior? do people use it? # keep the socket open (as a file), and return it self.file = self._conn.sock.makefile('rb', 0) # close our socket -- we want to restart after any protocol error self.close() self.headers = None return -1, e.line, None self.headers = response.msg self.file = response.fp return response.status, response.reason, response.msg def close(self): self._conn.close() # note that self.file == response.fp, which gets closed by the # superclass. just clear the object ref here. ### hmm. messy. if status==-1, then self.file is owned by us. ### well... we aren't explicitly closing, but losing this ref will ### do it self.file = None if hasattr(socket, 'ssl'): class HTTPS(HTTP): """Compatibility with 1.5 httplib interface Python 1.5.2 did not have an HTTPS class, but it defined an interface for sending http requests that is also useful for https. """ _connection_class = HTTPSConnection def __init__(self, host='', port=None, key_file=None, cert_file=None, strict=None): # provide a default host, pass the X509 cert info # urf. compensate for bad input. if port == 0: port = None self._setup(self._connection_class(host, port, key_file, cert_file, strict)) # we never actually use these for anything, but we keep them # here for compatibility with post-1.5.2 CVS. self.key_file = key_file self.cert_file = cert_file class HTTPException(Exception): # Subclasses that define an __init__ must call Exception.__init__ # or define self.args. Otherwise, str() will fail. pass class NotConnected(HTTPException): pass class InvalidURL(HTTPException): pass class UnknownProtocol(HTTPException): def __init__(self, version): self.args = version, self.version = version class UnknownTransferEncoding(HTTPException): pass class UnimplementedFileMode(HTTPException): pass class IncompleteRead(HTTPException): def __init__(self, partial): self.args = partial, self.partial = partial class ImproperConnectionState(HTTPException): pass class CannotSendRequest(ImproperConnectionState): pass class CannotSendHeader(ImproperConnectionState): pass class ResponseNotReady(ImproperConnectionState): pass class BadStatusLine(HTTPException): def __init__(self, line): self.args = line, self.line = line # for backwards compatibility error = HTTPException class LineAndFileWrapper: """A limited file-like object for HTTP/0.9 responses.""" # The status-line parsing code calls readline(), which normally # get the HTTP status line. For a 0.9 response, however, this is # actually the first line of the body! Clients need to get a # readable file object that contains that line. def __init__(self, line, file): self._line = line self._file = file self._line_consumed = 0 self._line_offset = 0 self._line_left = len(line) def __getattr__(self, attr): return getattr(self._file, attr) def _done(self): # called when the last byte is read from the line. After the # call, all read methods are delegated to the underlying file # object. self._line_consumed = 1 self.read = self._file.read self.readline = self._file.readline self.readlines = self._file.readlines def read(self, amt=None): if self._line_consumed: return self._file.read(amt) assert self._line_left if amt is None or amt > self._line_left: s = self._line[self._line_offset:] self._done() if amt is None: return s + self._file.read() else: return s + self._file.read(amt - len(s)) else: assert amt <= self._line_left i = self._line_offset j = i + amt s = self._line[i:j] self._line_offset = j self._line_left -= amt if self._line_left == 0: self._done() return s def readline(self): if self._line_consumed: return self._file.readline() assert self._line_left s = self._line[self._line_offset:] self._done() return s def readlines(self, size=None): if self._line_consumed: return self._file.readlines(size) assert self._line_left L = [self._line[self._line_offset:]] self._done() if size is None: return L + self._file.readlines() else: return L + self._file.readlines(size) def test(): """Test this module. A hodge podge of tests collected here, because they have too many external dependencies for the regular test suite. """ import sys import getopt opts, args = getopt.getopt(sys.argv[1:], 'd') dl = 0 for o, a in opts: if o == '-d': dl = dl + 1 host = 'www.python.org' selector = '/' if args[0:]: host = args[0] if args[1:]: selector = args[1] h = HTTP() h.set_debuglevel(dl) h.connect(host) h.putrequest('GET', selector) h.endheaders() status, reason, headers = h.getreply() print('status =', status) print('reason =', reason) print("read", len(h.getfile().read())) print() if headers: for header in headers.headers: print(header.strip()) print() # minimal test that code to extract host from url works class HTTP11(HTTP): _http_vsn = 11 _http_vsn_str = 'HTTP/1.1' h = HTTP11('www.python.org') h.putrequest('GET', 'http://www.python.org/~jeremy/') h.endheaders() h.getreply() h.close() if hasattr(socket, 'ssl'): for host, selector in (('sourceforge.net', '/projects/python'), ): print("https://%s%s" % (host, selector)) hs = HTTPS() hs.set_debuglevel(dl) hs.connect(host) hs.putrequest('GET', selector) hs.endheaders() status, reason, headers = hs.getreply() print('status =', status) print('reason =', reason) print("read", len(hs.getfile().read())) print() if headers: for header in headers.headers: print(header.strip()) print() if __name__ == '__main__': test() diff --git a/eventlib/green/socket.py b/eventlib/green/socket.py index 556b67c..60d3248 100644 --- a/eventlib/green/socket.py +++ b/eventlib/green/socket.py @@ -1,112 +1,110 @@ -__socket = __import__('socket') -for var in __socket.__all__: - exec("%s = __socket.%s" % (var, var)) -_fileobject = __socket._fileobject +import socket as __socket +from socket import (__all__, error, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR, getdefaulttimeout, gethostname, getnameinfo, getservbyname, herror, htonl, SOCK_DGRAM, timeout, gaierror, SOCK_RAW, setdefaulttimeout, getservbyport, gethostbyaddr, ntohl, htons, ntohs, getfqdn, SOCK_RDM, SOCK_SEQPACKET) from eventlib.api import get_hub from eventlib.greenio import GreenSocket as socket from eventlib.greenio import socketpair, fromfd +from socket import SocketIO import warnings - def gethostbyname(name): if getattr(get_hub(), 'uses_twisted_reactor', None): globals()['gethostbyname'] = _gethostbyname_twisted else: globals()['gethostbyname'] = _gethostbyname_tpool return globals()['gethostbyname'](name) def _gethostbyname_twisted(name): from twisted.internet import reactor from eventlib.twistedutil import block_on as _block_on return _block_on(reactor.resolve(name)) def _gethostbyname_tpool(name): from eventlib import tpool return tpool.execute( __socket.gethostbyname, name) def getaddrinfo(*args, **kw): if getattr(get_hub(), 'uses_twisted_reactor', None): globals()['getaddrinfo'] = _getaddrinfo_twisted else: globals()['getaddrinfo'] = _getaddrinfo_tpool return globals()['getaddrinfo'](*args, **kw) def _getaddrinfo_twisted(*args, **kw): from twisted.internet.threads import deferToThread from eventlib.twistedutil import block_on as _block_on return _block_on(deferToThread(__socket.getaddrinfo, *args, **kw)) def _getaddrinfo_tpool(*args, **kw): from eventlib import tpool return tpool.execute( __socket.getaddrinfo, *args, **kw) # XXX there're few more blocking functions in socket # XXX having a hub-independent way to access thread pool would be nice _GLOBAL_DEFAULT_TIMEOUT = __socket._GLOBAL_DEFAULT_TIMEOUT def create_connection(address, timeout=_GLOBAL_DEFAULT_TIMEOUT): """Connect to *address* and return the socket object. Convenience function. Connect to *address* (a 2-tuple ``(host, port)``) and return the socket object. Passing the optional *timeout* parameter will set the timeout on the socket instance before attempting to connect. If no *timeout* is supplied, the global default timeout setting returned by :func:`getdefaulttimeout` is used. """ msg = "getaddrinfo returns an empty list" host, port = address for res in getaddrinfo(host, port, 0, SOCK_STREAM): af, socktype, proto, canonname, sa = res sock = None try: sock = socket(af, socktype, proto) if timeout is not _GLOBAL_DEFAULT_TIMEOUT: sock.settimeout(timeout) sock.connect(sa) return sock except error as msg: if sock is not None: sock.close() raise error(msg) try: from eventlib.green import ssl as ssl_module except ImportError: # no ssl support pass else: # some constants the SSL module exports but not in __all__ from eventlib.green.ssl import (RAND_add, RAND_status, SSL_ERROR_ZERO_RETURN, SSL_ERROR_WANT_READ, SSL_ERROR_WANT_WRITE, SSL_ERROR_WANT_X509_LOOKUP, SSL_ERROR_SYSCALL, SSL_ERROR_SSL, SSL_ERROR_WANT_CONNECT, SSL_ERROR_EOF, SSL_ERROR_INVALID_ERROR_CODE) try: sslerror = __socket.sslerror __socket.ssl def ssl(sock, certificate=None, private_key=None): warnings.warn("socket.ssl() is deprecated. Use ssl.wrap_socket() instead.", DeprecationWarning, stacklevel=2) return ssl_module.sslwrap_simple(sock, private_key, certificate) except AttributeError: # if the real socket module doesn't have the ssl method or sslerror # exception, we don't emulate them pass diff --git a/eventlib/green/ssl.py b/eventlib/green/ssl.py index 1cadfd7..8accc08 100644 --- a/eventlib/green/ssl.py +++ b/eventlib/green/ssl.py @@ -1,259 +1,261 @@ -__ssl = __import__('ssl') -for var in (var for var in dir(__ssl) if not var.startswith('__')): - exec("%s = __ssl.%s" % (var, var)) -del var +import ssl as __ssl +from ssl import _ssl +from _ssl import ( RAND_add, RAND_status, SSL_ERROR_ZERO_RETURN, SSL_ERROR_WANT_READ, SSL_ERROR_WANT_WRITE, SSL_ERROR_WANT_X509_LOOKUP, SSL_ERROR_SYSCALL, SSL_ERROR_SSL, SSL_ERROR_WANT_CONNECT, SSL_ERROR_EOF, SSL_ERROR_INVALID_ERROR_CODE, SSLError ) time = __import__('time') from eventlib.api import trampoline -from eventlib.greenio import set_nonblocking, GreenSocket, CONNECT_ERR, CONNECT_SUCCESS, BLOCKING_ERR +from eventlib.greenio import ( set_nonblocking, GreenSocket, GreenFile, CONNECT_ERR, CONNECT_SUCCESS, BLOCKING_ERR ) orig_socket = __import__('socket') socket = orig_socket.socket __patched__ = ['SSLSocket', 'wrap_socket', 'sslwrap_simple'] class GreenSSLSocket(__ssl.SSLSocket): """ This is a green version of the SSLSocket class from the ssl module added in 2.6. For documentation on it, please see the Python standard documentation. Python nonblocking ssl objects don't give errors when the other end of the socket is closed (they do notice when the other end is shutdown, though). Any write/read operations will simply hang if the socket is closed from the other end. There is no obvious fix for this problem; it appears to be a limitation of Python's ssl object implementation. A workaround is to set a reasonable timeout on the socket using settimeout(), and to close/reopen the connection when a timeout occurs at an unexpected juncture in the code. """ # we are inheriting from SSLSocket because its constructor calls # do_handshake whose behavior we wish to override def __init__(self, sock, *args, **kw): if not isinstance(sock, GreenSocket): sock = GreenSocket(sock) self.act_non_blocking = sock.act_non_blocking self._timeout = sock.gettimeout() super(GreenSSLSocket, self).__init__(sock.fd, *args, **kw) # the superclass initializer trashes the methods so we remove # the local-object versions of them and let the actual class # methods shine through try: for fn in orig_socket._delegate_methods: delattr(self, fn) except AttributeError: pass def settimeout(self, timeout): self._timeout = timeout def gettimeout(self): return self._timeout def setblocking(self, flag): if flag: self.act_non_blocking = False self._timeout = None else: self.act_non_blocking = True self._timeout = 0.0 def _call_trampolining(self, func, *a, **kw): if self.act_non_blocking: return func(*a, **kw) else: while True: try: return func(*a, **kw) except SSLError as e: if e.args[0] == SSL_ERROR_WANT_READ: trampoline(self, read=True, timeout=self.gettimeout(), timeout_exc=SSLError('timed out')) elif e.args[0] == SSL_ERROR_WANT_WRITE: trampoline(self, write=True, timeout=self.gettimeout(), timeout_exc=SSLError('timed out')) else: raise def write(self, data): """Write DATA to the underlying SSL channel. Returns number of bytes of DATA actually transmitted.""" return self._call_trampolining( super(GreenSSLSocket, self).write, data) def read(self, len=1024): """Read up to LEN bytes and return them. Return zero-length string on EOF.""" return self._call_trampolining( super(GreenSSLSocket, self).read, len) def send (self, data, flags=0): + if not isinstance(data, bytes): + data = data.encode() if self._sslobj: return self._call_trampolining( super(GreenSSLSocket, self).send, data, flags) else: trampoline(self, write=True, timeout_exc=SSLError('timed out')) return socket.send(self, data, flags) def sendto (self, data, addr, flags=0): # *NOTE: gross, copied code from ssl.py becase it's not factored well enough to be used as-is if self._sslobj: raise ValueError("sendto not allowed on instances of %s" % self.__class__) else: trampoline(self, write=True, timeout_exc=SSLError('timed out')) return socket.sendto(self, data, addr, flags) def sendall (self, data, flags=0): # *NOTE: gross, copied code from ssl.py becase it's not factored well enough to be used as-is if self._sslobj: if flags != 0: raise ValueError( "non-zero flags not allowed in calls to sendall() on %s" % self.__class__) amount = len(data) count = 0 while (count < amount): v = self.send(data[count:]) count += v return amount else: trampoline(self, write=True, timeout_exc=SSLError('timed out')) return socket.sendall(self, data, flags) def recv(self, buflen=1024, flags=0): # *NOTE: gross, copied code from ssl.py becase it's not factored well enough to be used as-is if self._sslobj: if flags != 0: raise ValueError( "non-zero flags not allowed in calls to recv() on %s" % self.__class__) read = self.read(buflen) return read else: return socket.recv(self, buflen, flags) def recv_into (self, buffer, nbytes=None, flags=0): if not self.act_non_blocking: trampoline(self, read=True, timeout=self.gettimeout(), timeout_exc=SSLError('timed out')) return super(GreenSSLSocket, self).recv_into(buffer, nbytes, flags) def recvfrom (self, addr, buflen=1024, flags=0): if not self.act_non_blocking: trampoline(self, read=True, timeout=self.gettimeout(), timeout_exc=SSLError('timed out')) return super(GreenSSLSocket, self).recvfrom(addr, buflen, flags) def recvfrom_into (self, buffer, nbytes=None, flags=0): if not self.act_non_blocking: trampoline(self, read=True, timeout=self.gettimeout(), timeout_exc=SSLError('timed out')) return super(GreenSSLSocket, self).recvfrom_into(buffer, nbytes, flags) def unwrap(self): return GreenSocket(super(GreenSSLSocket, self).unwrap()) def do_handshake(self): """Perform a TLS/SSL handshake.""" return self._call_trampolining( super(GreenSSLSocket, self).do_handshake) def _socket_connect(self, addr): real_connect = socket.connect if self.act_non_blocking: return real_connect(self, addr) else: # *NOTE: gross, copied code from greenio because it's not factored # well enough to reuse if self.gettimeout() is None: while True: try: return real_connect(self, addr) except orig_socket.error as e: if e.args[0] in CONNECT_ERR: trampoline(self, write=True) elif e.args[0] in CONNECT_SUCCESS: return else: raise else: end = time.time() + self.gettimeout() while True: try: real_connect(self, addr) except orig_socket.error as e: if e.args[0] in CONNECT_ERR: trampoline(self, write=True, timeout=end-time.time(), timeout_exc=SSLError('timed out')) elif e.args[0] in CONNECT_SUCCESS: return else: raise if time.time() >= end: raise SSLError('timed out') def connect(self, addr): """Connects to remote ADDR, and then wraps the connection in an SSL channel.""" # *NOTE: grrrrr copied this code from ssl.py because of the reference # to socket.connect which we don't want to call directly if self._sslobj: raise ValueError("attempt to connect already-connected SSLSocket!") self._socket_connect(addr) self._sslobj = _ssl.sslwrap(self._sock, False, self.keyfile, self.certfile, self.cert_reqs, self.ssl_version, self.ca_certs, self.ciphers) if self.do_handshake_on_connect: self.do_handshake() def accept(self): """Accepts a new connection from a remote client, and returns a tuple containing that new connection wrapped with a server-side SSL channel, and the address of the remote client.""" # RDW grr duplication of code from greenio if self.act_non_blocking: newsock, addr = socket.accept(self) else: while True: try: newsock, addr = socket.accept(self) set_nonblocking(newsock) break except orig_socket.error as e: if e.args[0] not in BLOCKING_ERR: raise trampoline(self, read=True, timeout=self.gettimeout(), timeout_exc=SSLError('timed out')) new_ssl = type(self)(newsock, keyfile=self.keyfile, certfile=self.certfile, server_side=True, cert_reqs=self.cert_reqs, ssl_version=self.ssl_version, ca_certs=self.ca_certs, do_handshake_on_connect=self.do_handshake_on_connect, suppress_ragged_eofs=self.suppress_ragged_eofs) return (new_ssl, addr) def dup(self): raise NotImplementedError("Can't dup an ssl object") SSLSocket = GreenSSLSocket def wrap_socket(sock, *a, **kw): return GreenSSLSocket(sock, *a, **kw) if hasattr(__ssl, 'sslwrap_simple'): def sslwrap_simple(sock, keyfile=None, certfile=None): """A replacement for the old socket.ssl function. Designed for compability with Python 2.5 and earlier. Will disappear in Python 3.0.""" ssl_sock = GreenSSLSocket(sock, keyfile=keyfile, certfile=certfile, server_side=False, cert_reqs=CERT_NONE, ssl_version=PROTOCOL_SSLv23, + do_handshake_on_connect=False, ca_certs=None) return ssl_sock diff --git a/eventlib/green/thread.py b/eventlib/green/thread.py index 9ff5370..4e19059 100644 --- a/eventlib/green/thread.py +++ b/eventlib/green/thread.py @@ -1,35 +1,35 @@ """implements standard module 'thread' with greenlets""" -__thread = __import__('thread') +__thread = __import__('_thread') from eventlib.support import greenlets as greenlet from eventlib.api import spawn from eventlib.coros import Semaphore as LockType error = __thread.error def get_ident(gr=None): if gr is None: return id(greenlet.getcurrent()) else: return id(gr) def start_new_thread(function, args=(), kwargs={}): g = spawn(function, *args, **kwargs) return get_ident(g) def allocate_lock(): return LockType(1) def exit(): raise greenlet.GreenletExit if hasattr(__thread, 'stack_size'): def stack_size(size=None): if size is None: return __thread.stack_size() if size > __thread.stack_size(): return __thread.stack_size(size) else: pass # not going to decrease stack_size, because otherwise other greenlets in this thread will suffer # XXX interrupt_main diff --git a/eventlib/green/threading.py b/eventlib/green/threading.py index b92920c..c53c703 100644 --- a/eventlib/green/threading.py +++ b/eventlib/green/threading.py @@ -1,853 +1,853 @@ """Thread module emulating a subset of Java's threading model.""" import sys as _sys from eventlib.green import thread from eventlib.green.time import time as _time, sleep as _sleep from traceback import format_exc as _format_exc from collections import deque # Rename some stuff so "from threading import *" is safe __all__ = ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 'Timer', 'setprofile', 'settrace', 'local'] _start_new_thread = thread.start_new_thread _allocate_lock = thread.allocate_lock _get_ident = thread.get_ident ThreadError = thread.error del thread # Debug support (adapted from ihooks.py). # All the major classes here derive from _Verbose. We force that to # be a new-style class so that all the major classes here are new-style. # This helps debugging (type(instance) is more revealing for instances # of new-style classes). _VERBOSE = False if __debug__: class _Verbose(object): def __init__(self, verbose=None): if verbose is None: verbose = _VERBOSE self.__verbose = verbose def _note(self, format, *args): if self.__verbose: format = format % args format = "%s: %s\n" % ( currentThread().getName(), format) _sys.stderr.write(format) else: # Disable this when using "python -O" class _Verbose(object): def __init__(self, verbose=None): pass def _note(self, *args): pass # Support for profile and trace hooks _profile_hook = None _trace_hook = None def setprofile(func): global _profile_hook _profile_hook = func def settrace(func): global _trace_hook _trace_hook = func # Synchronization classes Lock = _allocate_lock def RLock(*args, **kwargs): return _RLock(*args, **kwargs) class _RLock(_Verbose): def __init__(self, verbose=None): _Verbose.__init__(self, verbose) self.__block = _allocate_lock() self.__owner = None self.__count = 0 def __repr__(self): owner = self.__owner return "<%s(%s, %d)>" % ( self.__class__.__name__, owner and owner.getName(), self.__count) def acquire(self, blocking=1): me = currentThread() if self.__owner is me: self.__count = self.__count + 1 if __debug__: self._note("%s.acquire(%s): recursive success", self, blocking) return 1 rc = self.__block.acquire(blocking) if rc: self.__owner = me self.__count = 1 if __debug__: self._note("%s.acquire(%s): initial success", self, blocking) else: if __debug__: self._note("%s.acquire(%s): failure", self, blocking) return rc __enter__ = acquire def release(self): if self.__owner is not currentThread(): raise RuntimeError("cannot release un-aquired lock") self.__count = count = self.__count - 1 if not count: self.__owner = None self.__block.release() if __debug__: self._note("%s.release(): final release", self) else: if __debug__: self._note("%s.release(): non-final release", self) def __exit__(self, t, v, tb): self.release() # Internal methods used by condition variables - def _acquire_restore(self, xxx_todo_changeme): - (count, owner) = xxx_todo_changeme + def _acquire_restore(self, state): + (count, owner) = state self.__block.acquire() self.__count = count self.__owner = owner if __debug__: self._note("%s._acquire_restore()", self) def _release_save(self): if __debug__: self._note("%s._release_save()", self) count = self.__count self.__count = 0 owner = self.__owner self.__owner = None self.__block.release() return (count, owner) def _is_owned(self): return self.__owner is currentThread() def Condition(*args, **kwargs): return _Condition(*args, **kwargs) class _Condition(_Verbose): def __init__(self, lock=None, verbose=None): _Verbose.__init__(self, verbose) if lock is None: lock = RLock() self.__lock = lock # Export the lock's acquire() and release() methods self.acquire = lock.acquire self.release = lock.release # If the lock defines _release_save() and/or _acquire_restore(), # these override the default implementations (which just call # release() and acquire() on the lock). Ditto for _is_owned(). try: self._release_save = lock._release_save except AttributeError: pass try: self._acquire_restore = lock._acquire_restore except AttributeError: pass try: self._is_owned = lock._is_owned except AttributeError: pass self.__waiters = [] def __enter__(self): return self.__lock.__enter__() def __exit__(self, *args): return self.__lock.__exit__(*args) def __repr__(self): return "" % (self.__lock, len(self.__waiters)) def _release_save(self): self.__lock.release() # No state to save def _acquire_restore(self, x): self.__lock.acquire() # Ignore saved state def _is_owned(self): # Return True if lock is owned by currentThread. # This method is called only if __lock doesn't have _is_owned(). if self.__lock.acquire(0): self.__lock.release() return False else: return True def wait(self, timeout=None): if not self._is_owned(): raise RuntimeError("cannot wait on un-aquired lock") waiter = _allocate_lock() waiter.acquire() self.__waiters.append(waiter) saved_state = self._release_save() try: # restore state no matter what (e.g., KeyboardInterrupt) if timeout is None: waiter.acquire() if __debug__: self._note("%s.wait(): got it", self) else: # Balancing act: We can't afford a pure busy loop, so we # have to sleep; but if we sleep the whole timeout time, # we'll be unresponsive. The scheme here sleeps very # little at first, longer as time goes on, but never longer # than 20 times per second (or the timeout time remaining). endtime = _time() + timeout delay = 0.0005 # 500 us -> initial delay of 1 ms while True: gotit = waiter.acquire(0) if gotit: break remaining = endtime - _time() if remaining <= 0: break delay = min(delay * 2, remaining, .05) _sleep(delay) if not gotit: if __debug__: self._note("%s.wait(%s): timed out", self, timeout) try: self.__waiters.remove(waiter) except ValueError: pass else: if __debug__: self._note("%s.wait(%s): got it", self, timeout) finally: self._acquire_restore(saved_state) def notify(self, n=1): if not self._is_owned(): raise RuntimeError("cannot notify on un-aquired lock") __waiters = self.__waiters waiters = __waiters[:n] if not waiters: if __debug__: self._note("%s.notify(): no waiters", self) return self._note("%s.notify(): notifying %d waiter%s", self, n, n!=1 and "s" or "") for waiter in waiters: waiter.release() try: __waiters.remove(waiter) except ValueError: pass def notifyAll(self): self.notify(len(self.__waiters)) def Semaphore(*args, **kwargs): return _Semaphore(*args, **kwargs) class _Semaphore(_Verbose): # After Tim Peters' semaphore class, but not quite the same (no maximum) def __init__(self, value=1, verbose=None): if value < 0: raise ValueError("semaphore initial value must be >= 0") _Verbose.__init__(self, verbose) self.__cond = Condition(Lock()) self.__value = value def acquire(self, blocking=1): rc = False self.__cond.acquire() while self.__value == 0: if not blocking: break if __debug__: self._note("%s.acquire(%s): blocked waiting, value=%s", self, blocking, self.__value) self.__cond.wait() else: self.__value = self.__value - 1 if __debug__: self._note("%s.acquire: success, value=%s", self, self.__value) rc = True self.__cond.release() return rc __enter__ = acquire def release(self): self.__cond.acquire() self.__value = self.__value + 1 if __debug__: self._note("%s.release: success, value=%s", self, self.__value) self.__cond.notify() self.__cond.release() def __exit__(self, t, v, tb): self.release() def BoundedSemaphore(*args, **kwargs): return _BoundedSemaphore(*args, **kwargs) class _BoundedSemaphore(_Semaphore): """Semaphore that checks that # releases is <= # acquires""" def __init__(self, value=1, verbose=None): _Semaphore.__init__(self, value, verbose) self._initial_value = value def release(self): if self._Semaphore__value >= self._initial_value: raise ValueError("Semaphore released too many times") return _Semaphore.release(self) def Event(*args, **kwargs): return _Event(*args, **kwargs) class _Event(_Verbose): # After Tim Peters' event class (without is_posted()) def __init__(self, verbose=None): _Verbose.__init__(self, verbose) self.__cond = Condition(Lock()) self.__flag = False def isSet(self): return self.__flag def set(self): self.__cond.acquire() try: self.__flag = True self.__cond.notifyAll() finally: self.__cond.release() def clear(self): self.__cond.acquire() try: self.__flag = False finally: self.__cond.release() def wait(self, timeout=None): self.__cond.acquire() try: if not self.__flag: self.__cond.wait(timeout) finally: self.__cond.release() # Helper to generate new thread names _counter = 0 def _newname(template="Thread-%d"): global _counter _counter = _counter + 1 return template % _counter # Active thread administration _active_limbo_lock = _allocate_lock() _active = {} # maps thread id to Thread object _limbo = {} # Main class for threads class Thread(_Verbose): __initialized = False # Need to store a reference to sys.exc_info for printing # out exceptions when a thread tries to use a global var. during interp. # shutdown and thus raises an exception about trying to perform some # operation on/with a NoneType __exc_info = _sys.exc_info def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None): assert group is None, "group argument must be None for now" _Verbose.__init__(self, verbose) if kwargs is None: kwargs = {} self.__target = target self.__name = str(name or _newname()) self.__args = args self.__kwargs = kwargs self.__daemonic = self._set_daemon() self.__started = False self.__stopped = False self.__block = Condition(Lock()) self.__initialized = True # sys.stderr is not stored in the class like # sys.exc_info since it can be changed between instances self.__stderr = _sys.stderr def _set_daemon(self): # Overridden in _MainThread and _DummyThread return currentThread().isDaemon() def __repr__(self): assert self.__initialized, "Thread.__init__() was not called" status = "initial" if self.__started: status = "started" if self.__stopped: status = "stopped" if self.__daemonic: status = status + " daemon" return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status) def start(self): if not self.__initialized: raise RuntimeError("thread.__init__() not called") if self.__started: raise RuntimeError("thread already started") if __debug__: self._note("%s.start(): starting thread", self) _active_limbo_lock.acquire() _limbo[self] = self _active_limbo_lock.release() _start_new_thread(self.__bootstrap, ()) self.__started = True _sleep(0.000001) # 1 usec, to let the thread run (Solaris hack) def run(self): if self.__target: self.__target(*self.__args, **self.__kwargs) def __bootstrap(self): # Wrapper around the real bootstrap code that ignores # exceptions during interpreter cleanup. Those typically # happen when a daemon thread wakes up at an unfortunate # moment, finds the world around it destroyed, and raises some # random exception *** while trying to report the exception in # __bootstrap_inner() below ***. Those random exceptions # don't help anybody, and they confuse users, so we suppress # them. We suppress them only when it appears that the world # indeed has already been destroyed, so that exceptions in # __bootstrap_inner() during normal business hours are properly # reported. Also, we only suppress them for daemonic threads; # if a non-daemonic encounters this, something else is wrong. try: self.__bootstrap_inner() except: if self.__daemonic and _sys is None: return raise def __bootstrap_inner(self): try: self.__started = True _active_limbo_lock.acquire() _active[_get_ident()] = self del _limbo[self] _active_limbo_lock.release() if __debug__: self._note("%s.__bootstrap(): thread started", self) if _trace_hook: self._note("%s.__bootstrap(): registering trace hook", self) _sys.settrace(_trace_hook) if _profile_hook: self._note("%s.__bootstrap(): registering profile hook", self) _sys.setprofile(_profile_hook) try: self.run() except SystemExit: if __debug__: self._note("%s.__bootstrap(): raised SystemExit", self) except: if __debug__: self._note("%s.__bootstrap(): unhandled exception", self) # If sys.stderr is no more (most likely from interpreter # shutdown) use self.__stderr. Otherwise still use sys (as in # _sys) in case sys.stderr was redefined since the creation of # self. if _sys: _sys.stderr.write("Exception in thread %s:\n%s\n" % (self.getName(), _format_exc())) else: # Do the best job possible w/o a huge amt. of code to # approximate a traceback (code ideas from # Lib/traceback.py) exc_type, exc_value, exc_tb = self.__exc_info() try: print(( "Exception in thread " + self.getName() + " (most likely raised during interpreter shutdown):"), file=self.__stderr) print(( "Traceback (most recent call last):"), file=self.__stderr) while exc_tb: print(( ' File "%s", line %s, in %s' % (exc_tb.tb_frame.f_code.co_filename, exc_tb.tb_lineno, exc_tb.tb_frame.f_code.co_name)), file=self.__stderr) exc_tb = exc_tb.tb_next print(("%s: %s" % (exc_type, exc_value)), file=self.__stderr) # Make sure that exc_tb gets deleted since it is a memory # hog; deleting everything else is just for thoroughness finally: del exc_type, exc_value, exc_tb else: if __debug__: self._note("%s.__bootstrap(): normal return", self) finally: _active_limbo_lock.acquire() try: self.__stop() try: # We don't call self.__delete() because it also # grabs _active_limbo_lock. del _active[_get_ident()] except: pass finally: _active_limbo_lock.release() def __stop(self): self.__block.acquire() self.__stopped = True self.__block.notifyAll() self.__block.release() def __delete(self): "Remove current thread from the dict of currently running threads." # Notes about running with dummy_thread: # # Must take care to not raise an exception if dummy_thread is being # used (and thus this module is being used as an instance of # dummy_threading). dummy_thread.get_ident() always returns -1 since # there is only one thread if dummy_thread is being used. Thus # len(_active) is always <= 1 here, and any Thread instance created # overwrites the (if any) thread currently registered in _active. # # An instance of _MainThread is always created by 'threading'. This # gets overwritten the instant an instance of Thread is created; both # threads return -1 from dummy_thread.get_ident() and thus have the # same key in the dict. So when the _MainThread instance created by # 'threading' tries to clean itself up when atexit calls this method # it gets a KeyError if another Thread instance was created. # # This all means that KeyError from trying to delete something from # _active if dummy_threading is being used is a red herring. But # since it isn't if dummy_threading is *not* being used then don't # hide the exception. _active_limbo_lock.acquire() try: try: del _active[_get_ident()] except KeyError: if 'dummy_threading' not in _sys.modules: raise finally: _active_limbo_lock.release() def join(self, timeout=None): if not self.__initialized: raise RuntimeError("Thread.__init__() not called") if not self.__started: raise RuntimeError("cannot join thread before it is started") if self is currentThread(): raise RuntimeError("cannot join current thread") if __debug__: if not self.__stopped: self._note("%s.join(): waiting until thread stops", self) self.__block.acquire() try: if timeout is None: while not self.__stopped: self.__block.wait() if __debug__: self._note("%s.join(): thread stopped", self) else: deadline = _time() + timeout while not self.__stopped: delay = deadline - _time() if delay <= 0: if __debug__: self._note("%s.join(): timed out", self) break self.__block.wait(delay) else: if __debug__: self._note("%s.join(): thread stopped", self) finally: self.__block.release() def getName(self): assert self.__initialized, "Thread.__init__() not called" return self.__name def setName(self, name): assert self.__initialized, "Thread.__init__() not called" self.__name = str(name) def isAlive(self): assert self.__initialized, "Thread.__init__() not called" return self.__started and not self.__stopped def isDaemon(self): assert self.__initialized, "Thread.__init__() not called" return self.__daemonic def setDaemon(self, daemonic): if not self.__initialized: raise RuntimeError("Thread.__init__() not called") if self.__started: raise RuntimeError("cannot set daemon status of active thread"); self.__daemonic = daemonic # The timer class was contributed by Itamar Shtull-Trauring def Timer(*args, **kwargs): return _Timer(*args, **kwargs) class _Timer(Thread): """Call a function after a specified number of seconds: t = Timer(30.0, f, args=[], kwargs={}) t.start() t.cancel() # stop the timer's action if it's still waiting """ def __init__(self, interval, function, args=[], kwargs={}): Thread.__init__(self) self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.finished = Event() def cancel(self): """Stop the timer if it hasn't finished yet""" self.finished.set() def run(self): self.finished.wait(self.interval) if not self.finished.isSet(): self.function(*self.args, **self.kwargs) self.finished.set() # Special thread class to represent the main thread # This is garbage collected through an exit handler class _MainThread(Thread): def __init__(self): Thread.__init__(self, name="MainThread") self._Thread__started = True _active_limbo_lock.acquire() _active[_get_ident()] = self _active_limbo_lock.release() def _set_daemon(self): return False def _exitfunc(self): self._Thread__stop() t = _pickSomeNonDaemonThread() if t: if __debug__: self._note("%s: waiting for other threads", self) while t: t.join() t = _pickSomeNonDaemonThread() if __debug__: self._note("%s: exiting", self) self._Thread__delete() def _pickSomeNonDaemonThread(): for t in enumerate(): if not t.isDaemon() and t.isAlive(): return t return None # Dummy thread class to represent threads not started here. # These aren't garbage collected when they die, nor can they be waited for. # If they invoke anything in threading.py that calls currentThread(), they # leave an entry in the _active dict forever after. # Their purpose is to return *something* from currentThread(). # They are marked as daemon threads so we won't wait for them # when we exit (conform previous semantics). class _DummyThread(Thread): def __init__(self): Thread.__init__(self, name=_newname("Dummy-%d")) # Thread.__block consumes an OS-level locking primitive, which # can never be used by a _DummyThread. Since a _DummyThread # instance is immortal, that's bad, so release this resource. del self._Thread__block self._Thread__started = True _active_limbo_lock.acquire() _active[_get_ident()] = self _active_limbo_lock.release() def _set_daemon(self): return True def join(self, timeout=None): assert False, "cannot join a dummy thread" # Global API functions def currentThread(): try: return _active[_get_ident()] except KeyError: ##print "currentThread(): no current thread for", _get_ident() return _DummyThread() def activeCount(): _active_limbo_lock.acquire() count = len(_active) + len(_limbo) _active_limbo_lock.release() return count def enumerate(): _active_limbo_lock.acquire() active = list(_active.values()) + list(_limbo.values()) _active_limbo_lock.release() return active try: from .thread import stack_size __all__.append('stack_size') except ImportError: pass # Create the main thread object, # and make it available for the interpreter # (Py_Main) as threading._shutdown. _shutdown = _MainThread()._exitfunc # get thread-local implementation, either from the thread # module, or from the python fallback try: from .thread import _local as local except ImportError: from _threading_local import local # Self-test code def _test(): class BoundedQueue(_Verbose): def __init__(self, limit): _Verbose.__init__(self) self.mon = RLock() self.rc = Condition(self.mon) self.wc = Condition(self.mon) self.limit = limit self.queue = deque() def put(self, item): self.mon.acquire() while len(self.queue) >= self.limit: self._note("put(%s): queue full", item) self.wc.wait() self.queue.append(item) self._note("put(%s): appended, length now %d", item, len(self.queue)) self.rc.notify() self.mon.release() def get(self): self.mon.acquire() while not self.queue: self._note("get(): queue empty") self.rc.wait() item = self.queue.popleft() self._note("get(): got %s, %d left", item, len(self.queue)) self.wc.notify() self.mon.release() return item class ProducerThread(Thread): def __init__(self, queue, quota): Thread.__init__(self, name="Producer") self.queue = queue self.quota = quota def run(self): from random import random counter = 0 while counter < self.quota: counter = counter + 1 self.queue.put("%s.%d" % (self.getName(), counter)) _sleep(random() * 0.00001) class ConsumerThread(Thread): def __init__(self, queue, count): Thread.__init__(self, name="Consumer") self.queue = queue self.count = count def run(self): while self.count > 0: item = self.queue.get() print(item) self.count = self.count - 1 NP = 3 QL = 4 NI = 5 Q = BoundedQueue(QL) P = [] for i in range(NP): t = ProducerThread(Q, NI) t.setName("Producer-%d" % (i+1)) P.append(t) C = ConsumerThread(Q, NI*NP) for t in P: t.start() _sleep(0.000001) C.start() for t in P: t.join() C.join() if __name__ == '__main__': _test() diff --git a/eventlib/green/urllib.py b/eventlib/green/urllib.py index 80e2464..924b079 100644 --- a/eventlib/green/urllib.py +++ b/eventlib/green/urllib.py @@ -1,650 +1,644 @@ -urllib = __import__('urllib') -for var in dir(urllib): - exec("%s = urllib.%s" % (var, var)) - -# import the following to be a better drop-in replacement -__import_lst = ['__all__', '__version__', 'MAXFTPCACHE', 'ContentTooShortError', - 'ftpcache', '_noheaders', 'noheaders', 'addbase', 'addclosehook', - 'addinfo', 'addinfourl', '_is_unicode', 'toBytes', '_hextochr', - 'always_safe', 'getproxies_environment', 'proxy_bypass'] - -for var in __import_lst: - exec("%s = urllib.%s" % (var, var)) - -from eventlib.green import socket import os -from eventlib.green import time import sys +import urllib +from eventlib.green import socket, time +from urllib.error import( __all__ ) +from urllib.response import( __all__ ) +from urllib.robotparser import( __all__ ) +from urllib.request import( __all__, __version__, MAXFTPCACHE, ftpcache, ftpwrapper, _noheaders, noheaders, proxy_bypass ) +from urllib.parse import( __all__, splithost, splituser, splittype, splitattr, splitpasswd, splitport, splitquery, splitvalue) from urllib.parse import urljoin as basejoin +parse = urllib.parse.parse_qs + # Shortcut for basic usage _urlopener = None def urlopen(url, data=None, proxies=None): """urlopen(url [, data]) -> open file-like object""" global _urlopener if proxies is not None: opener = FancyURLopener(proxies=proxies) elif not _urlopener: opener = FancyURLopener() _urlopener = opener else: opener = _urlopener if data is None: return opener.open(url) else: return opener.open(url, data) def urlretrieve(url, filename=None, reporthook=None, data=None): global _urlopener if not _urlopener: _urlopener = FancyURLopener() return _urlopener.retrieve(url, filename, reporthook, data) def urlcleanup(): if _urlopener: _urlopener.cleanup() class URLopener(urllib.request.URLopener): def open_http(self, url, data=None): """Use HTTP protocol.""" from eventlib.green import httplib user_passwd = None proxy_passwd= None if isinstance(url, str): host, selector = splithost(url) if host: user_passwd, host = splituser(host) host = unquote(host) realhost = host else: host, selector = url # check whether the proxy contains authorization information proxy_passwd, host = splituser(host) # now we proceed with the url we want to obtain urltype, rest = splittype(selector) url = rest user_passwd = None if urltype.lower() != 'http': realhost = None else: realhost, rest = splithost(rest) if realhost: user_passwd, realhost = splituser(realhost) if user_passwd: selector = "%s://%s%s" % (urltype, realhost, rest) if proxy_bypass(realhost): host = realhost #print "proxy via http:", host, selector if not host: raise IOError('http error', 'no host given') if proxy_passwd: import base64 proxy_auth = base64.b64encode(proxy_passwd).strip() else: proxy_auth = None if user_passwd: import base64 auth = base64.b64encode(user_passwd).strip() else: auth = None h = httplib.HTTP(host) if data is not None: h.putrequest('POST', selector) h.putheader('Content-Type', 'application/x-www-form-urlencoded') h.putheader('Content-Length', '%d' % len(data)) else: h.putrequest('GET', selector) if proxy_auth: h.putheader('Proxy-Authorization', 'Basic %s' % proxy_auth) if auth: h.putheader('Authorization', 'Basic %s' % auth) if realhost: h.putheader('Host', realhost) for args in self.addheaders: h.putheader(*args) h.endheaders() if data is not None: h.send(data) errcode, errmsg, headers = h.getreply() if errcode == -1: # something went wrong with the HTTP status line raise IOError('http protocol error', 0, 'got a bad status line', None) fp = h.getfile() if errcode == 200: return addinfourl(fp, headers, "http:" + url) else: if data is None: return self.http_error(url, fp, errcode, errmsg, headers) else: return self.http_error(url, fp, errcode, errmsg, headers, data) if hasattr(socket, "ssl"): def open_https(self, url, data=None): """Use HTTPS protocol.""" from eventlib.green import httplib user_passwd = None proxy_passwd = None if isinstance(url, str): host, selector = splithost(url) if host: user_passwd, host = splituser(host) host = unquote(host) realhost = host else: host, selector = url # here, we determine, whether the proxy contains authorization information proxy_passwd, host = splituser(host) urltype, rest = splittype(selector) url = rest user_passwd = None if urltype.lower() != 'https': realhost = None else: realhost, rest = splithost(rest) if realhost: user_passwd, realhost = splituser(realhost) if user_passwd: selector = "%s://%s%s" % (urltype, realhost, rest) #print "proxy via https:", host, selector if not host: raise IOError('https error', 'no host given') if proxy_passwd: import base64 proxy_auth = base64.b64encode(proxy_passwd).strip() else: proxy_auth = None if user_passwd: import base64 auth = base64.b64encode(user_passwd).strip() else: auth = None h = httplib.HTTPS(host, 0, key_file=self.key_file, cert_file=self.cert_file) if data is not None: h.putrequest('POST', selector) h.putheader('Content-Type', 'application/x-www-form-urlencoded') h.putheader('Content-Length', '%d' % len(data)) else: h.putrequest('GET', selector) if proxy_auth: h.putheader('Proxy-Authorization', 'Basic %s' % proxy_auth) if auth: h.putheader('Authorization', 'Basic %s' % auth) if realhost: h.putheader('Host', realhost) for args in self.addheaders: h.putheader(*args) h.endheaders() if data is not None: h.send(data) errcode, errmsg, headers = h.getreply() if errcode == -1: # something went wrong with the HTTP status line raise IOError('http protocol error', 0, 'got a bad status line', None) fp = h.getfile() if errcode == 200: return addinfourl(fp, headers, "https:" + url) else: if data is None: return self.http_error(url, fp, errcode, errmsg, headers) else: return self.http_error(url, fp, errcode, errmsg, headers, data) def open_gopher(self, url): """Use Gopher protocol.""" if not isinstance(url, str): raise IOError('gopher error', 'proxy support for gopher protocol currently not implemented') from eventlib.green import gopherlib host, selector = splithost(url) if not host: raise IOError('gopher error', 'no host given') host = unquote(host) type, selector = splitgophertype(selector) selector, query = splitquery(selector) selector = unquote(selector) if query: query = unquote(query) fp = gopherlib.send_query(selector, query, host) else: fp = gopherlib.send_selector(selector, host) return addinfourl(fp, noheaders(), "gopher:" + url) def open_local_file(self, url): """Use local file.""" import mimetypes, mimetools, email.Utils try: from io import StringIO except ImportError: from io import StringIO host, file = splithost(url) localname = url2pathname(file) try: stats = os.stat(localname) except OSError as e: raise IOError(e.errno, e.strerror, e.filename) size = stats.st_size modified = email.Utils.formatdate(stats.st_mtime, usegmt=True) mtype = mimetypes.guess_type(url)[0] headers = mimetools.Message(StringIO( 'Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n' % (mtype or 'text/plain', size, modified))) if not host: urlfile = file if file[:1] == '/': urlfile = 'file://' + file return addinfourl(open(localname, 'rb'), headers, urlfile) host, port = splitport(host) if not port \ and socket.gethostbyname(host) in (localhost(), thishost()): urlfile = file if file[:1] == '/': urlfile = 'file://' + file return addinfourl(open(localname, 'rb'), headers, urlfile) raise IOError('local file error', 'not on local host') def open_ftp(self, url): """Use FTP protocol.""" if not isinstance(url, str): raise IOError('ftp error', 'proxy support for ftp protocol currently not implemented') import mimetypes, mimetools try: from io import StringIO except ImportError: from io import StringIO host, path = splithost(url) if not host: raise IOError('ftp error', 'no host given') host, port = splitport(host) user, host = splituser(host) if user: user, passwd = splitpasswd(user) else: passwd = None host = unquote(host) user = unquote(user or '') passwd = unquote(passwd or '') host = socket.gethostbyname(host) if not port: from eventlib.green import ftplib port = ftplib.FTP_PORT else: port = int(port) path, attrs = splitattr(path) path = unquote(path) dirs = path.split('/') dirs, file = dirs[:-1], dirs[-1] if dirs and not dirs[0]: dirs = dirs[1:] if dirs and not dirs[0]: dirs[0] = '/' key = user, host, port, '/'.join(dirs) # XXX thread unsafe! if len(self.ftpcache) > MAXFTPCACHE: # Prune the cache, rather arbitrarily for k in list(self.ftpcache.keys()): if k != key: v = self.ftpcache[k] del self.ftpcache[k] v.close() try: if not key in self.ftpcache: self.ftpcache[key] = \ ftpwrapper(user, passwd, host, port, dirs) if not file: type = 'D' else: type = 'I' for attr in attrs: attr, value = splitvalue(attr) if attr.lower() == 'type' and \ value in ('a', 'A', 'i', 'I', 'd', 'D'): type = value.upper() (fp, retrlen) = self.ftpcache[key].retrfile(file, type) mtype = mimetypes.guess_type("ftp:" + url)[0] headers = "" if mtype: headers += "Content-Type: %s\n" % mtype if retrlen is not None and retrlen >= 0: headers += "Content-Length: %d\n" % retrlen headers = mimetools.Message(StringIO(headers)) return addinfourl(fp, headers, "ftp:" + url) except ftperrors() as msg: raise IOError('ftp error', msg).with_traceback(sys.exc_info()[2]) # this one is copied verbatim class FancyURLopener(URLopener): """Derived class with handlers for errors we can handle (perhaps).""" def __init__(self, *args, **kwargs): URLopener.__init__(self, *args, **kwargs) self.auth_cache = {} self.tries = 0 self.maxtries = 10 def http_error_default(self, url, fp, errcode, errmsg, headers): """Default error handling -- don't raise an exception.""" return addinfourl(fp, headers, "http:" + url) def http_error_302(self, url, fp, errcode, errmsg, headers, data=None): """Error 302 -- relocated (temporarily).""" self.tries += 1 if self.maxtries and self.tries >= self.maxtries: if hasattr(self, "http_error_500"): meth = self.http_error_500 else: meth = self.http_error_default self.tries = 0 return meth(url, fp, 500, "Internal Server Error: Redirect Recursion", headers) result = self.redirect_internal(url, fp, errcode, errmsg, headers, data) self.tries = 0 return result def redirect_internal(self, url, fp, errcode, errmsg, headers, data): if 'location' in headers: newurl = headers['location'] elif 'uri' in headers: newurl = headers['uri'] else: return void = fp.read() fp.close() # In case the server sent a relative URL, join with original: newurl = basejoin(self.type + ":" + url, newurl) return self.open(newurl) def http_error_301(self, url, fp, errcode, errmsg, headers, data=None): """Error 301 -- also relocated (permanently).""" return self.http_error_302(url, fp, errcode, errmsg, headers, data) def http_error_303(self, url, fp, errcode, errmsg, headers, data=None): """Error 303 -- also relocated (essentially identical to 302).""" return self.http_error_302(url, fp, errcode, errmsg, headers, data) def http_error_307(self, url, fp, errcode, errmsg, headers, data=None): """Error 307 -- relocated, but turn POST into error.""" if data is None: return self.http_error_302(url, fp, errcode, errmsg, headers, data) else: return self.http_error_default(url, fp, errcode, errmsg, headers) def http_error_401(self, url, fp, errcode, errmsg, headers, data=None): """Error 401 -- authentication required. This function supports Basic authentication only.""" if not 'www-authenticate' in headers: URLopener.http_error_default(self, url, fp, errcode, errmsg, headers) stuff = headers['www-authenticate'] import re match = re.match('[ \t]*([^ \t]+)[ \t]+realm="([^"]*)"', stuff) if not match: URLopener.http_error_default(self, url, fp, errcode, errmsg, headers) scheme, realm = match.groups() if scheme.lower() != 'basic': URLopener.http_error_default(self, url, fp, errcode, errmsg, headers) name = 'retry_' + self.type + '_basic_auth' if data is None: return getattr(self,name)(url, realm) else: return getattr(self,name)(url, realm, data) def http_error_407(self, url, fp, errcode, errmsg, headers, data=None): """Error 407 -- proxy authentication required. This function supports Basic authentication only.""" if not 'proxy-authenticate' in headers: URLopener.http_error_default(self, url, fp, errcode, errmsg, headers) stuff = headers['proxy-authenticate'] import re match = re.match('[ \t]*([^ \t]+)[ \t]+realm="([^"]*)"', stuff) if not match: URLopener.http_error_default(self, url, fp, errcode, errmsg, headers) scheme, realm = match.groups() if scheme.lower() != 'basic': URLopener.http_error_default(self, url, fp, errcode, errmsg, headers) name = 'retry_proxy_' + self.type + '_basic_auth' if data is None: return getattr(self,name)(url, realm) else: return getattr(self,name)(url, realm, data) def retry_proxy_http_basic_auth(self, url, realm, data=None): host, selector = splithost(url) newurl = 'http://' + host + selector proxy = self.proxies['http'] urltype, proxyhost = splittype(proxy) proxyhost, proxyselector = splithost(proxyhost) i = proxyhost.find('@') + 1 proxyhost = proxyhost[i:] user, passwd = self.get_user_passwd(proxyhost, realm, i) if not (user or passwd): return None proxyhost = quote(user, safe='') + ':' + quote(passwd, safe='') + '@' + proxyhost self.proxies['http'] = 'http://' + proxyhost + proxyselector if data is None: return self.open(newurl) else: return self.open(newurl, data) def retry_proxy_https_basic_auth(self, url, realm, data=None): host, selector = splithost(url) newurl = 'https://' + host + selector proxy = self.proxies['https'] urltype, proxyhost = splittype(proxy) proxyhost, proxyselector = splithost(proxyhost) i = proxyhost.find('@') + 1 proxyhost = proxyhost[i:] user, passwd = self.get_user_passwd(proxyhost, realm, i) if not (user or passwd): return None proxyhost = quote(user, safe='') + ':' + quote(passwd, safe='') + '@' + proxyhost self.proxies['https'] = 'https://' + proxyhost + proxyselector if data is None: return self.open(newurl) else: return self.open(newurl, data) def retry_http_basic_auth(self, url, realm, data=None): host, selector = splithost(url) i = host.find('@') + 1 host = host[i:] user, passwd = self.get_user_passwd(host, realm, i) if not (user or passwd): return None host = quote(user, safe='') + ':' + quote(passwd, safe='') + '@' + host newurl = 'http://' + host + selector if data is None: return self.open(newurl) else: return self.open(newurl, data) def retry_https_basic_auth(self, url, realm, data=None): host, selector = splithost(url) i = host.find('@') + 1 host = host[i:] user, passwd = self.get_user_passwd(host, realm, i) if not (user or passwd): return None host = quote(user, safe='') + ':' + quote(passwd, safe='') + '@' + host newurl = 'https://' + host + selector if data is None: return self.open(newurl) else: return self.open(newurl, data) def get_user_passwd(self, host, realm, clear_cache = 0): key = realm + '@' + host.lower() if key in self.auth_cache: if clear_cache: del self.auth_cache[key] else: return self.auth_cache[key] user, passwd = self.prompt_user_passwd(host, realm) if user or passwd: self.auth_cache[key] = (user, passwd) return user, passwd def prompt_user_passwd(self, host, realm): """Override this in a GUI environment!""" import getpass try: user = input("Enter username for %s at %s: " % (realm, host)) passwd = getpass.getpass("Enter password for %s in %s at %s: " % (user, realm, host)) return user, passwd except KeyboardInterrupt: print() return None, None # Utility functions _localhost = None def localhost(): """Return the IP address of the magic hostname 'localhost'.""" global _localhost if _localhost is None: _localhost = socket.gethostbyname('localhost') return _localhost _thishost = None def thishost(): """Return the IP address of the current host.""" global _thishost if _thishost is None: _thishost = socket.gethostbyname(socket.gethostname()) return _thishost _ftperrors = None def ftperrors(): """Return the set of errors raised by the FTP class.""" global _ftperrors if _ftperrors is None: from eventlib.green import ftplib _ftperrors = ftplib.all_errors return _ftperrors # Utility classes -class ftpwrapper(urllib.ftpwrapper): +class ftpwrapper(ftpwrapper): """Class used by open_ftp() for cache of open FTP connections.""" def init(self): from eventlib.green import ftplib self.busy = 0 self.ftp = ftplib.FTP() self.ftp.connect(self.host, self.port) self.ftp.login(self.user, self.passwd) for dir in self.dirs: self.ftp.cwd(dir) def retrfile(self, file, type): from eventlib.green import ftplib self.endtransfer() if type in ('d', 'D'): cmd = 'TYPE A'; isdir = 1 else: cmd = 'TYPE ' + type; isdir = 0 try: self.ftp.voidcmd(cmd) except ftplib.all_errors: self.init() self.ftp.voidcmd(cmd) conn = None if file and not isdir: # Try to retrieve as a file try: cmd = 'RETR ' + file conn = self.ftp.ntransfercmd(cmd) except ftplib.error_perm as reason: if str(reason)[:3] != '550': raise IOError('ftp error', reason).with_traceback(sys.exc_info()[2]) if not conn: # Set transfer mode to ASCII! self.ftp.voidcmd('TYPE A') # Try a directory listing if file: cmd = 'LIST ' + file else: cmd = 'LIST' conn = self.ftp.ntransfercmd(cmd) self.busy = 1 # Pass back both a suitably decorated object and a retrieval length return (addclosehook(conn[0].makefile('rb'), self.endtransfer), conn[1]) # Test and time quote() and unquote() def test1(): s = '' for i in range(256): s = s + chr(i) s = s*4 t0 = time.time() qs = quote(s) uqs = unquote(qs) t1 = time.time() if uqs != s: print('Wrong!') print(repr(s)) print(repr(qs)) print(repr(uqs)) print(round(t1 - t0, 3), 'sec') def reporthook(blocknum, blocksize, totalsize): # Report during remote transfers print("Block number: %d, Block size: %d, Total size: %d" % ( blocknum, blocksize, totalsize)) # Test program def test(args=[]): if not args: args = [ '/etc/passwd', 'file:/etc/passwd', 'file://localhost/etc/passwd', 'ftp://ftp.gnu.org/pub/README', ## 'gopher://gopher.micro.umn.edu/1/', 'http://www.python.org/index.html', ] if hasattr(URLopener, "open_https"): args.append('https://synergy.as.cmu.edu/~geek/') try: for url in args: print('-'*10, url, '-'*10) fn, h = urlretrieve(url, None, reporthook) print(fn) if h: print('======') for k in list(h.keys()): print(k + ':', h[k]) print('======') fp = open(fn, 'rb') data = fp.read() del fp if '\r' in data: table = string.maketrans("", "") data = data.translate(table, "\r") print(data) fn, h = None, None print('-'*40) finally: urlcleanup() def main(): import getopt, sys try: opts, args = getopt.getopt(sys.argv[1:], "th") except getopt.error as msg: print(msg) print("Use -h for help") return t = 0 for o, a in opts: if o == '-t': t = t + 1 if o == '-h': print("Usage: python urllib.py [-t] [url ...]") print("-t runs self-test;", end=' ') print("otherwise, contents of urls are printed") return if t: if t > 1: test1() test(args) else: if not args: print("Use -h for help") for url in args: print(urlopen(url).read(), end=' ') # Run test program when run as a script if __name__ == '__main__': main() diff --git a/eventlib/greenio.py b/eventlib/greenio.py index c59cfae..fa1889c 100644 --- a/eventlib/greenio.py +++ b/eventlib/greenio.py @@ -1,534 +1,547 @@ # Copyright (c) 2005-2006, Bob Ippolito # Copyright (c) 2007, Linden Research, Inc. # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. from eventlib.api import trampoline, get_hub BUFFER_SIZE = 4096 import errno import os import sys import socket from socket import socket as _original_socket import time __all__ = ['GreenSocket', 'GreenFile', 'GreenPipe'] def higher_order_recv(recv_func): def recv(self, buflen): if self.act_non_blocking: return self.fd.recv(buflen) buf = self.recvbuffer if buf: chunk, self.recvbuffer = buf[:buflen], buf[buflen:] return chunk fd = self.fd - bytes = recv_func(fd, buflen) + _bytes = recv_func(fd, buflen) if self.gettimeout(): end = time.time()+self.gettimeout() else: end = None timeout = None - while bytes is None: + while _bytes is None: try: if end: timeout = end - time.time() trampoline(fd, read=True, timeout=timeout, timeout_exc=socket.timeout) except socket.timeout: raise except socket.error as e: if e[0] == errno.EPIPE: - bytes = '' + _bytes = b'' else: raise else: - bytes = recv_func(fd, buflen) - self.recvcount += len(bytes) - return bytes + _bytes = recv_func(fd, buflen) + self.recvcount += len(_bytes) + return _bytes return recv def higher_order_send(send_func): def send(self, data): + if not isinstance(data, bytes): + data = data.encode() if self.act_non_blocking: return self.fd.send(data) count = send_func(self.fd, data) if not count: + return self.fd.send(data) return 0 self.sendcount += count return count return send if sys.platform == 'win32': CONNECT_ERR = (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK, errno.WSAEINVAL, errno.WSAEWOULDBLOCK) CONNECT_SUCCESS = (0, errno.EISCONN, errno.WSAEISCONN) else: CONNECT_ERR = (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK) CONNECT_SUCCESS = (0, errno.EISCONN) def socket_connect(descriptor, address): err = descriptor.connect_ex(address) if err in CONNECT_ERR: return None if err not in CONNECT_SUCCESS: raise socket.error(err, errno.errorcode[err]) return descriptor if sys.platform == 'win32': BLOCKING_ERR = (errno.EWOULDBLOCK, errno.WSAEWOULDBLOCK) else: BLOCKING_ERR = (errno.EWOULDBLOCK, ) def socket_accept(descriptor): try: return descriptor.accept() except socket.error as e: if e.args[0] in BLOCKING_ERR: return None raise -def socket_send(descriptor, data): +def socket_send(descriptor, data:bytes): try: return descriptor.send(data) except socket.error as e: - if e.args[0] in BLOCKING_ERR + errno.ENOTCONN: + if e.args[0] in BLOCKING_ERR + (errno.ENOTCONN, ): return 0 raise # winsock sometimes throws ENOTCONN SOCKET_CLOSED = (errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN) def socket_recv(descriptor, buflen): try: return descriptor.recv(buflen) except socket.error as e: if e.args[0] in BLOCKING_ERR: return None if e.args[0] in SOCKET_CLOSED: return '' raise def file_recv(fd, buflen): try: return fd.read(buflen) except IOError as e: if e[0] == errno.EAGAIN: return None return '' except socket.error as e: if e[0] == errno.EPIPE: return '' raise def file_send(fd, data): try: fd.write(data) fd.flush() return len(data) except IOError as e: if e[0] == errno.EAGAIN: return 0 except ValueError as e: written = 0 except socket.error as e: if e[0] == errno.EPIPE: written = 0 def set_nonblocking(fd): try: setblocking = fd.setblocking except AttributeError: # This version of Python predates socket.setblocking() import fcntl fileno = fd.fileno() flags = fcntl.fcntl(fileno, fcntl.F_GETFL) fcntl.fcntl(fileno, fcntl.F_SETFL, flags | os.O_NONBLOCK) else: # socket supports setblocking() setblocking(0) class GreenSocket(object): is_secure = False timeout = None def __init__(self, family_or_realsock=socket.AF_INET, *args, **kwargs): if isinstance(family_or_realsock, int): fd = _original_socket(family_or_realsock, *args, **kwargs) else: fd = family_or_realsock assert not args, args assert not kwargs, kwargs set_nonblocking(fd) self.fd = fd self._fileno = fd.fileno() self.sendcount = 0 self.recvcount = 0 - self.recvbuffer = '' + self.recvbuffer = b'' self.closed = False self.timeout = socket.getdefaulttimeout() # when client calls setblocking(0) or settimeout(0) the socket must # act non-blocking self.act_non_blocking = False @property def family(self): return self.fd.family @property def type(self): return self.fd.type @property def proto(self): return self.fd.proto def accept(self): if self.act_non_blocking: return self.fd.accept() fd = self.fd while True: res = socket_accept(fd) if res is not None: client, addr = res set_nonblocking(client) return type(self)(client), addr trampoline(fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout) def bind(self, *args, **kw): fn = self.bind = self.fd.bind return fn(*args, **kw) + + def _decref_socketios(self): + if self.closed: + self.close() + def close(self, *args, **kw): if self.closed: return self.closed = True if self.is_secure: # *NOTE: This is not quite the correct SSL shutdown sequence. # We should actually be checking the return value of shutdown. # Note also that this is not the same as calling self.shutdown(). self.fd.shutdown() fn = self.close = self.fd.close try: res = fn(*args, **kw) finally: # This will raise socket.error(32, 'Broken pipe') if there's # a caller waiting on trampoline (e.g. server on .accept()) get_hub().exc_descriptor(self._fileno) return res def connect(self, address): if self.act_non_blocking: return self.fd.connect(address) fd = self.fd if self.gettimeout() is None: while not socket_connect(fd, address): trampoline(fd, write=True, timeout_exc=socket.timeout) else: end = time.time() + self.gettimeout() while True: if socket_connect(fd, address): return if time.time() >= end: raise socket.timeout trampoline(fd, write=True, timeout=end-time.time(), timeout_exc=socket.timeout) def connect_ex(self, address): if self.act_non_blocking: return self.fd.connect_ex(address) fd = self.fd if self.gettimeout() is None: while not socket_connect(fd, address): try: trampoline(fd, write=True, timeout_exc=socket.timeout) except socket.error as ex: return ex[0] else: end = time.time() + self.gettimeout() while True: if socket_connect(fd, address): return 0 if time.time() >= end: raise socket.timeout try: trampoline(fd, write=True, timeout=end-time.time(), timeout_exc=socket.timeout) except socket.error as ex: return ex[0] def dup(self, *args, **kw): sock = self.fd.dup(*args, **kw) set_nonblocking(sock) newsock = type(self)(sock) newsock.settimeout(self.timeout) return newsock def fileno(self, *args, **kw): fn = self.fileno = self.fd.fileno return fn(*args, **kw) def getpeername(self, *args, **kw): fn = self.getpeername = self.fd.getpeername return fn(*args, **kw) def getsockname(self, *args, **kw): fn = self.getsockname = self.fd.getsockname return fn(*args, **kw) def getsockopt(self, *args, **kw): fn = self.getsockopt = self.fd.getsockopt return fn(*args, **kw) def listen(self, *args, **kw): fn = self.listen = self.fd.listen return fn(*args, **kw) def makefile(self, mode='r', bufsize=-1): - return socket._fileobject(self.dup(), mode, bufsize) + return socket.SocketIO(self.dup(), mode) def makeGreenFile(self, mode='r', bufsize=-1): return GreenFile(self.dup()) recv = higher_order_recv(socket_recv) def recvfrom(self, *args): if not self.act_non_blocking: trampoline(self.fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout) return self.fd.recvfrom(*args) def recvfrom_into(self, *args): if not self.act_non_blocking: trampoline(self.fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout) return self.fd.recvfrom_into(*args) def recv_into(self, *args): if not self.act_non_blocking: trampoline(self.fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout) return self.fd.recv_into(*args) send = higher_order_send(socket_send) def sendall(self, data): fd = self.fd tail = self.send(data) while tail < len(data): trampoline(self.fd, write=True, timeout_exc=socket.timeout) tail += self.send(data[tail:]) def sendto(self, *args): trampoline(self.fd, write=True, timeout_exc=socket.timeout) return self.fd.sendto(*args) def setblocking(self, flag): if flag: self.act_non_blocking = False self.timeout = None else: self.act_non_blocking = True self.timeout = 0.0 def setsockopt(self, *args, **kw): fn = self.setsockopt = self.fd.setsockopt return fn(*args, **kw) def shutdown(self, *args, **kw): if self.is_secure: fn = self.shutdown = self.fd.sock_shutdown else: fn = self.shutdown = self.fd.shutdown return fn(*args, **kw) def settimeout(self, howlong): if howlong is None: self.setblocking(True) return try: f = howlong.__float__ except AttributeError: raise TypeError('a float is required') howlong = f() if howlong < 0.0: raise ValueError('Timeout value out of range') if howlong == 0.0: self.setblocking(howlong) else: self.timeout = howlong def gettimeout(self): return self.timeout def read(self, size=None): if size is not None and not isinstance(size, int): raise TypeError('Expecting an int or long for size, got %s: %s' % (type(size), repr(size))) buf, self.sock.recvbuffer = self.sock.recvbuffer, '' lst = [buf] if size is None: while True: d = self.sock.recv(BUFFER_SIZE) if not d: break lst.append(d) else: buflen = len(buf) while buflen < size: d = self.sock.recv(BUFFER_SIZE) if not d: break buflen += len(d) lst.append(d) else: d = lst[-1] overbite = buflen - size if overbite: lst[-1], self.sock.recvbuffer = d[:-overbite], d[-overbite:] else: lst[-1], self.sock.recvbuffer = d, '' - return ''.join(lst) + if isinstance(lst[0], bytes): + stringlst = [x.decode('utf-8') for x in lst] + return ''.join(stringlst) + else: + return ''.join(lst) + class GreenFile(object): newlines = '\r\n' mode = 'wb+' def __init__(self, fd): if isinstance(fd, GreenSocket): set_nonblocking(fd.fd) else: set_nonblocking(fd) self.sock = fd self.closed = False def close(self): self.sock.close() self.closed = True def fileno(self): return self.sock.fileno() # TODO next def flush(self): pass def write(self, data): return self.sock.sendall(data) def readuntil(self, terminator, size=None): buf, self.sock.recvbuffer = self.sock.recvbuffer, '' checked = 0 if size is None: while True: - found = buf.find(terminator, checked) + found = buf.find(terminator.encode(), checked) if found != -1: found += len(terminator) chunk, self.sock.recvbuffer = buf[:found], buf[found:] return chunk checked = max(0, len(buf) - (len(terminator) - 1)) d = self.sock.recv(BUFFER_SIZE) if not d: break buf += d return buf while len(buf) < size: found = buf.find(terminator, checked) if found != -1: found += len(terminator) chunk, self.sock.recvbuffer = buf[:found], buf[found:] return chunk checked = len(buf) d = self.sock.recv(BUFFER_SIZE) if not d: break buf += d chunk, self.sock.recvbuffer = buf[:size], buf[size:] return chunk def readline(self, size=None): return self.readuntil(self.newlines, size=size) def __iter__(self): return self def readlines(self, size=None): return list(self.xreadlines(size=size)) def xreadlines(self, size=None): if size is None: while True: line = self.readline() if not line: break yield line else: while size > 0: line = self.readline(size) if not line: break yield line size -= len(line) def writelines(self, lines): for line in lines: self.write(line) read = read class GreenPipeSocket(GreenSocket): """ This is a weird class that looks like a socket but expects a file descriptor as an argument instead of a socket. """ recv = higher_order_recv(file_recv) send = higher_order_send(file_send) class GreenPipe(GreenFile): def __init__(self, fd): set_nonblocking(fd) self.fd = GreenPipeSocket(fd) super(GreenPipe, self).__init__(self.fd) def recv(self, *args, **kw): fn = self.recv = self.fd.recv return fn(*args, **kw) def send(self, *args, **kw): fn = self.send = self.fd.send return fn(*args, **kw) def flush(self): self.fd.fd.flush() def socketpair(*args): one, two = socket.socketpair(*args) return GreenSocket(one), GreenSocket(two) def fromfd(*args): return GreenSocket(socket.fromfd(*args)) diff --git a/eventlib/httpc.py b/eventlib/httpc.py index cd93f33..4a8d93e 100644 --- a/eventlib/httpc.py +++ b/eventlib/httpc.py @@ -1,758 +1,758 @@ # @author Donovan Preston # # Copyright (c) 2005-2006, Donovan Preston # Copyright (c) 2007, Linden Research, Inc. # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import copy from eventlib.green import httplib import os.path import os import time import urllib.parse url_parser = urllib.parse.urlparse _old_HTTPConnection = httplib.HTTPConnection _old_HTTPSConnection = httplib.HTTPSConnection HTTP_TIME_FORMAT = '%a, %d %b %Y %H:%M:%S GMT' to_http_time = lambda t: time.strftime(HTTP_TIME_FORMAT, time.gmtime(t)) try: from mx import DateTime def from_http_time(t, defaultdate=None): return int(DateTime.Parser.DateTimeFromString( t, defaultdate=defaultdate).gmticks()) except ImportError: import calendar parse_formats = (HTTP_TIME_FORMAT, # RFC 1123 '%A, %d-%b-%y %H:%M:%S GMT', # RFC 850 '%a %b %d %H:%M:%S %Y') # asctime def from_http_time(t, defaultdate=None): for parser in parse_formats: try: return calendar.timegm(time.strptime(t, parser)) except ValueError: continue return defaultdate def host_and_port_from_url(url): """@brief Simple function to get host and port from an http url. @return Returns host, port and port may be None. """ host = None port = None parsed_url = url_parser(url) try: host, port = parsed_url[1].split(':') except ValueError: host = parsed_url[1].split(':') return host, port def better_putrequest(self, method, url, skip_host=0, skip_accept_encoding=0): self.method = method self.path = url try: # Python 2.4 and above self.old_putrequest(method, url, skip_host, skip_accept_encoding) except TypeError: # Python 2.3 and below self.old_putrequest(method, url, skip_host) class HttpClient(httplib.HTTPConnection): """A subclass of httplib.HTTPConnection that provides a better putrequest that records the method and path on the request object. """ def __init__(self, host, port=None, strict=None): _old_HTTPConnection.__init__(self, host, port, strict) old_putrequest = httplib.HTTPConnection.putrequest putrequest = better_putrequest class HttpsClient(httplib.HTTPSConnection): """A subclass of httplib.HTTPSConnection that provides a better putrequest that records the method and path on the request object. """ old_putrequest = httplib.HTTPSConnection.putrequest putrequest = better_putrequest def wrap_httplib_with_httpc(): """Replace httplib's implementations of these classes with our enhanced ones. Needed to work around code that uses httplib directly.""" httplib.HTTP._connection_class = httplib.HTTPConnection = HttpClient httplib.HTTPS._connection_class = httplib.HTTPSConnection = HttpsClient class FileScheme(object): """Retarded scheme to local file wrapper.""" host = '' port = '' reason = '' def __init__(self, location): pass def request(self, method, fullpath, body='', headers=None): self.status = 200 self.msg = '' self.path = fullpath.split('?')[0] self.method = method = method.lower() assert method in ('get', 'put', 'delete') if method == 'delete': try: os.remove(self.path) except OSError: pass # don't complain if already deleted elif method == 'put': try: - f = file(self.path, 'w') + f = open(self.path, 'w') f.write(body) f.close() except IOError as e: self.status = 500 self.raise_connection_error() elif method == 'get': if not os.path.exists(self.path): self.status = 404 self.raise_connection_error(NotFound) def connect(self): pass def getresponse(self): return self def getheader(self, header): if header == 'content-length': try: return os.path.getsize(self.path) except OSError: return 0 def read(self, howmuch=None): if self.method == 'get': try: - fl = file(self.path, 'r') + fl = open(self.path, 'r') if howmuch is None: return fl.read() else: return fl.read(howmuch) except IOError: self.status = 500 self.raise_connection_error() return '' def raise_connection_error(self, klass=None): if klass is None: klass=ConnectionError raise klass(_Params('file://' + self.path, self.method)) def close(self): """We're challenged here, and read the whole file rather than integrating with this lib. file object already out of scope at this point""" pass class _Params(object): def __init__(self, url, method, body='', headers=None, dumper=None, loader=None, use_proxy=False, ok=(), aux=None): ''' @param connection The connection (as returned by make_connection) to use for the request. @param method HTTP method @param url Full url to make request on. @param body HTTP body, if necessary for the method. Can be any object, assuming an appropriate dumper is also provided. @param headers Dict of header name to header value @param dumper Method that formats the body as a string. @param loader Method that converts the response body into an object. @param use_proxy Set to True if the connection is to a proxy. @param ok Set of valid response statuses. If the returned status is not in this list, an exception is thrown. ''' self.instance = None self.url = url self.path = url self.method = method self.body = body if headers is None: self.headers = {} else: self.headers = headers self.dumper = dumper self.loader = loader self.use_proxy = use_proxy self.ok = ok or (200, 201, 204) self.orig_body = body self.aux = aux class _LocalParams(_Params): def __init__(self, params, **kwargs): self._delegate = params for k, v in kwargs.items(): setattr(self, k, v) def __getattr__(self, key): if key == '__setstate__': return return getattr(self._delegate, key) def __reduce__(self): params = copy.copy(self._delegate) kwargs = copy.copy(self.__dict__) assert('_delegate' in kwargs) del kwargs['_delegate'] if hasattr(params,'aux'): del params.aux return (_LocalParams,(params,),kwargs) def __setitem__(self, k, item): setattr(self, k, item) class ConnectionError(Exception): """Detailed exception class for reporting on http connection problems. There are lots of subclasses so you can use closely-specified exception clauses.""" def __init__(self, params): self.params = params Exception.__init__(self) def location(self): return self.params.response.msg.dict.get('location') def expired(self): # 14.21 Expires # # HTTP/1.1 clients and caches MUST treat other invalid date # formats, especially including the value "0", as in the past # (i.e., "already expired"). expires = from_http_time( self.params.response_headers.get('expires', '0'), defaultdate=DateTime.Epoch) return time.time() > expires def __repr__(self): try: response = self.params.response return "%s(url=%r, method=%r, status=%r, reason=%r, body=%r)" % ( self.__class__.__name__, self.params.url, self.params.method, response.status, response.reason, self.params.body) except AttributeError: # url and method are required fields on the params object return "%s(url=%r, method=%r)" % ( self.__class__.__name__, self.params.url, self.params.method) __str__ = __repr__ class UnparseableResponse(ConnectionError): """Raised when a loader cannot parse the response from the server.""" def __init__(self, content_type, parser, response, url): self.content_type = content_type self.parser = parser self.response = response self.url = url Exception.__init__(self) def __repr__(self): return "Could not parse the data at the URL %r of content-type %r with %r\nData:\n%s" % ( self.url, self.content_type, self.parser, self.response) __str__ = __repr__ class Accepted(ConnectionError): """ 202 Accepted """ pass class Retriable(ConnectionError): def retry_method(self): return self.params.method def retry_url(self): return self.location() or self.url() def retry_(self): params = _LocalParams(self.params, url=self.retry_url(), path=self.retry_url(), # need to override both path and # url because request_ uses path # but not url when using a proxy method=self.retry_method()) return self.params.instance.request_(params) def retry(self): return self.retry_()[-1] class MovedPermanently(Retriable): """ 301 Moved Permanently """ pass class Found(Retriable): """ 302 Found """ pass class SeeOther(Retriable): """ 303 See Other """ def retry_method(self): return 'GET' class NotModified(ConnectionError): """ 304 Not Modified """ pass class TemporaryRedirect(Retriable): """ 307 Temporary Redirect """ pass class BadRequest(ConnectionError): """ 400 Bad Request """ pass class Unauthorized(ConnectionError): """ 401 Unauthorized """ pass class PaymentRequired(ConnectionError): """ 402 Payment Required """ pass class Forbidden(ConnectionError): """ 403 Forbidden """ pass class NotFound(ConnectionError): """ 404 Not Found """ pass class RequestTimeout(ConnectionError): """ 408 RequestTimeout """ pass class Gone(ConnectionError): """ 410 Gone """ pass class LengthRequired(ConnectionError): """ 411 Length Required """ pass class RequestEntityTooLarge(ConnectionError): """ 413 Request Entity Too Large """ pass class RequestURITooLong(ConnectionError): """ 414 Request-URI Too Long """ pass class UnsupportedMediaType(ConnectionError): """ 415 Unsupported Media Type """ pass class RequestedRangeNotSatisfiable(ConnectionError): """ 416 Requested Range Not Satisfiable """ pass class ExpectationFailed(ConnectionError): """ 417 Expectation Failed """ pass class NotImplemented(ConnectionError): """ 501 Not Implemented """ pass class ServiceUnavailable(Retriable): """ 503 Service Unavailable """ def url(self): return self.params._delegate.url class GatewayTimeout(Retriable): """ 504 Gateway Timeout """ def url(self): return self.params._delegate.url class HTTPVersionNotSupported(ConnectionError): """ 505 HTTP Version Not Supported """ pass class InternalServerError(ConnectionError): """ 500 Internal Server Error """ def __repr__(self): try: import simplejson traceback = simplejson.loads(self.params.response_body) except: try: from indra.base import llsd traceback = llsd.parse(self.params.response_body) except: traceback = self.params.response_body if(isinstance(traceback, dict) and 'stack-trace' in traceback and 'description' in traceback): body = traceback traceback = "Traceback (most recent call last):\n" for frame in body['stack-trace']: traceback += ' File "%s", line %s, in %s\n' % ( frame['filename'], frame['lineno'], frame['method']) for line in frame['code']: if line['lineno'] == frame['lineno']: traceback += ' %s' % (line['line'].lstrip(), ) break traceback += body['description'] return "The server raised an exception from our request:\n%s %s\n%s %s\n%s" % ( self.params.method, self.params.url, self.params.response.status, self.params.response.reason, traceback) __str__ = __repr__ status_to_error_map = { 202: Accepted, 301: MovedPermanently, 302: Found, 303: SeeOther, 304: NotModified, 307: TemporaryRedirect, 400: BadRequest, 401: Unauthorized, 402: PaymentRequired, 403: Forbidden, 404: NotFound, 408: RequestTimeout, 410: Gone, 411: LengthRequired, 413: RequestEntityTooLarge, 414: RequestURITooLong, 415: UnsupportedMediaType, 416: RequestedRangeNotSatisfiable, 417: ExpectationFailed, 500: InternalServerError, 501: NotImplemented, 503: ServiceUnavailable, 504: GatewayTimeout, 505: HTTPVersionNotSupported, } scheme_to_factory_map = { 'http': HttpClient, 'https': HttpsClient, 'file': FileScheme, } def make_connection(scheme, location, use_proxy): """ Create a connection object to a host:port. @param scheme Protocol, scheme, whatever you want to call it. http, file, https are currently supported. @param location Hostname and port number, formatted as host:port or http://host:port if you're so inclined. @param use_proxy Connect to a proxy instead of the actual location. Uses environment variables to decide where the proxy actually lives. """ if use_proxy: if "http_proxy" in os.environ: location = os.environ["http_proxy"] elif "ALL_PROXY" in os.environ: location = os.environ["ALL_PROXY"] else: location = "localhost:3128" #default to local squid # run a little heuristic to see if location is an url, and if so parse out the hostpart if location.startswith('http'): _scheme, location, path, parameters, query, fragment = url_parser(location) if use_proxy and scheme == 'https': scheme = 'http' result = scheme_to_factory_map[scheme](location) result.connect() return result def connect(url, use_proxy=False): """ Create a connection object to the host specified in a url. Convenience function for make_connection.""" scheme, location = url_parser(url)[:2] try: return make_connection(scheme, location, use_proxy) except KeyError: raise ValueError("Unknown url scheme %s in url %s" % (scheme, url)) def make_safe_loader(loader): if not callable(loader): return loader def safe_loader(what): try: return loader(what) except Exception: import traceback traceback.print_exc() return None return safe_loader class HttpSuite(object): def __init__(self, dumper, loader, fallback_content_type): self.dumper = dumper self.loader = loader self.fallback_content_type = fallback_content_type def request_(self, params, connection=None): '''Make an http request to a url, for internal use mostly.''' params = _LocalParams(params, instance=self) (scheme, location, path, parameters, query, fragment) = url_parser(params.url) if params.use_proxy: if scheme == 'file': params.use_proxy = False else: params.headers['host'] = location if not params.use_proxy: params.path = path if query: params.path += '?' + query params.orig_body = params.body if params.method in ('PUT', 'POST'): if self.dumper is not None: params.body = self.dumper(params.body) # don't set content-length header because httplib does it # for us in _send_request else: params.body = '' params.response, params.response_body = self._get_response_body(params, connection) response, body = params.response, params.response_body if self.loader is not None: try: body = make_safe_loader(self.loader(body)) except KeyboardInterrupt: raise except Exception as e: raise UnparseableResponse(response.msg.get('content-type', '(unknown)'), self.loader, body, params.url) return response.status, response.msg, body def _check_status(self, params): response = params.response if response.status not in params.ok: klass = status_to_error_map.get(response.status, ConnectionError) raise klass(params) def _get_response_body(self, params, connection): if connection is None: connection = connect(params.url, params.use_proxy) # if we're creating a new connection we know the caller # isn't going to reuse it params.headers['connection'] = 'close' connection.request(params.method, params.path, params.body, params.headers) params.response = connection.getresponse() params.response_body = params.response.read() connection.close() self._check_status(params) return params.response, params.response_body def request(self, params, connection=None): return self.request_(params, connection=connection)[-1] def head_( self, url, headers=None, use_proxy=False, ok=None, aux=None, connection=None): return self.request_( _Params( url, 'HEAD', headers=headers, loader=self.loader, dumper=self.dumper, use_proxy=use_proxy, ok=ok, aux=aux), connection) def head(self, *args, **kwargs): return self.head_(*args, **kwargs)[-1] def get_( self, url, headers=None, use_proxy=False, ok=None, aux=None, max_retries=8, connection=None): if headers is None: headers = {} headers['accept'] = self.fallback_content_type+';q=1,*/*;q=0' def req(): return self.request_(_Params(url, 'GET', headers=headers, loader=self.loader, dumper=self.dumper, use_proxy=use_proxy, ok=ok, aux=aux), connection) def retry_response(err): def doit(): return err.retry_() return doit retried = 0 while retried <= max_retries: try: return req() except (Found, TemporaryRedirect, MovedPermanently, SeeOther) as e: if retried >= max_retries: raise retried += 1 req = retry_response(e) def get(self, *args, **kwargs): return self.get_(*args, **kwargs)[-1] def put_(self, url, data, headers=None, content_type=None, ok=None, aux=None, connection=None): if headers is None: headers = {} if 'content-type' not in headers: if content_type is None: headers['content-type'] = self.fallback_content_type else: headers['content-type'] = content_type headers['accept'] = headers['content-type']+';q=1,*/*;q=0' return self.request_( _Params( url, 'PUT', body=data, headers=headers, loader=self.loader, dumper=self.dumper, ok=ok, aux=aux), connection) def put(self, *args, **kwargs): return self.put_(*args, **kwargs)[-1] def delete_(self, url, ok=None, aux=None, connection=None): return self.request_( _Params( url, 'DELETE', loader=self.loader, dumper=self.dumper, ok=ok, aux=aux), connection) def delete(self, *args, **kwargs): return self.delete_(*args, **kwargs)[-1] def post_( self, url, data='', headers=None, use_proxy=False, content_type=None, ok=None, aux=None, connection=None): if headers is None: headers = {} if 'content-type' not in headers: if content_type is None: headers['content-type'] = self.fallback_content_type else: headers['content-type'] = content_type headers['accept'] = headers['content-type']+';q=1,*/*;q=0' return self.request_( _Params( url, 'POST', body=data, headers=headers, loader=self.loader, dumper=self.dumper, use_proxy=use_proxy, ok=ok, aux=aux), connection) def post(self, *args, **kwargs): return self.post_(*args, **kwargs)[-1] class HttpStreamSuite(HttpSuite): def request_(self, params): '''Make an http request to a url, for internal use mostly.''' params = _LocalParams(params, instance=self) (scheme, location, path, parameters, query, fragment) = url_parser(params.url) if params.use_proxy: if scheme == 'file': params.use_proxy = False else: params.headers['host'] = location if not params.use_proxy: params.path = path if query: params.path += '?' + query params.orig_body = params.body if params.method in ('PUT', 'POST'): if self.dumper is not None: params.body = self.dumper(params.body) # don't set content-length header because httplib does it # for us in _send_request else: params.body = '' params.response = self._get_response_body(params) response = params.response return response.status, response.msg, response def _get_response_body(self, params): connection = connect(params.url, params.use_proxy) connection.request(params.method, params.path, params.body, params.headers) params.response = connection.getresponse() #connection.close() self._check_status(params) return params.response def make_suite(dumper, loader, fallback_content_type): """ Return a tuple of methods for making http requests with automatic bidirectional formatting with a particular content-type.""" suite = HttpSuite(dumper, loader, fallback_content_type) return suite.get, suite.put, suite.delete, suite.post suite = HttpSuite(str, None, 'text/plain') delete = suite.delete delete_ = suite.delete_ get = suite.get get_ = suite.get_ head = suite.head head_ = suite.head_ post = suite.post post_ = suite.post_ put = suite.put put_ = suite.put_ request = suite.request request_ = suite.request_ diff --git a/eventlib/hubs/hub.py b/eventlib/hubs/hub.py index bbc5ffd..08f0b8b 100644 --- a/eventlib/hubs/hub.py +++ b/eventlib/hubs/hub.py @@ -1,305 +1,305 @@ # Copyright (c) 2007, Linden Research, Inc. # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import bisect import sys import traceback import time from eventlib.support import greenlets as greenlet from eventlib.timer import Timer, LocalTimer _g_debug = True class BaseHub(object): """ Base hub class for easing the implementation of subclasses that are specific to a particular underlying event architecture. """ SYSTEM_EXCEPTIONS = (KeyboardInterrupt, SystemExit) def __init__(self, clock=time.time): self.readers = {} self.writers = {} self.excs = {} self.clock = clock self.greenlet = greenlet.greenlet(self.run) self.stopping = False self.running = False self.timers = [] self.next_timers = [] self.observers = {} self.observer_modes = { 'entry': [], 'before_timers': [], 'before_waiting': [], 'after_waiting': [], 'exit': [], } def add_descriptor(self, fileno, read=None, write=None, exc=None): """ Signals an intent to read/write from a particular file descriptor. The fileno argument is the file number of the file of interest. The other arguments are either callbacks or None. If there is a callback for read or write, the hub sets things up so that when the file descriptor is ready to be read or written, the callback is called. The exc callback is called when the socket represented by the file descriptor is closed. The intent is that the the exc callbacks should only be present when either a read or write callback is also present, so the exc callback happens instead of the respective read or write callback. """ read = read or self.readers.get(fileno) if read is not None: self.readers[fileno] = read else: self.readers.pop(fileno, None) write = write or self.writers.get(fileno) if write is not None: self.writers[fileno] = write else: self.writers.pop(fileno, None) exc = exc or self.excs.get(fileno) if exc is not None: self.excs[fileno] = exc else: self.excs.pop(fileno, None) return fileno def remove_descriptor(self, fileno): self.readers.pop(fileno, None) self.writers.pop(fileno, None) self.excs.pop(fileno, None) def exc_descriptor(self, fileno): exc = self.excs.get(fileno) if exc is not None: try: exc(fileno) except self.SYSTEM_EXCEPTIONS: self.squelch_exception(fileno, sys.exc_info()) def stop(self): self.abort() if self.greenlet is not greenlet.getcurrent(): self.switch() def switch(self): cur = greenlet.getcurrent() assert cur is not self.greenlet, 'Cannot switch to MAINLOOP from MAINLOOP' switch_out = getattr(cur, 'switch_out', None) if switch_out is not None: try: switch_out() except: traceback.print_exception(*sys.exc_info()) if self.greenlet.dead: self.greenlet = greenlet.greenlet(self.run) try: greenlet.getcurrent().parent = self.greenlet except ValueError: pass return self.greenlet.switch() def squelch_exception(self, fileno, exc_info): traceback.print_exception(*exc_info) print("Removing descriptor: %r" % (fileno,), file=sys.stderr) try: self.remove_descriptor(fileno) except Exception as e: print("Exception while removing descriptor! %r" % (e,), file=sys.stderr) def wait(self, seconds=None): raise NotImplementedError("Implement this in a subclass") def default_sleep(self): return 60.0 def sleep_until(self): t = self.timers if not t: return None return t[0][0] def run(self): """Run the runloop until abort is called. """ if self.running: raise RuntimeError("Already running!") try: self.running = True self.stopping = False self.fire_observers('entry') while not self.stopping: self.prepare_timers() self.fire_observers('before_timers') self.fire_timers(self.clock()) self.prepare_timers() wakeup_when = self.sleep_until() if wakeup_when is None: - sleep_time = self.default_sleep() + sleep_time = self.default_sleep() else: sleep_time = wakeup_when - self.clock() if sleep_time > 0: self.fire_observers('before_waiting') self.wait(sleep_time) self.fire_observers('after_waiting') else: self.wait(0) else: del self.timers[:] del self.next_timers[:] self.fire_observers('exit') finally: self.running = False self.stopping = False def abort(self): """Stop the runloop. If run is executing, it will exit after completing the next runloop iteration. """ if self.running: self.stopping = True def add_observer(self, observer, *modes): """Add an event observer to this runloop with the given modes. Valid modes are: entry: The runloop is being entered. before_timers: Before the expired timers for this iteration are executed. before_waiting: Before waiting for the calculated wait_time where nothing will happen. after_waiting: After waiting, immediately before starting the top of the runloop again. exit: The runloop is exiting. If no mode is passed or mode is all, the observer will be fired for every event type. """ if not modes or modes == ('all',): modes = tuple(self.observer_modes) self.observers[observer] = modes for mode in modes: self.observer_modes[mode].append(observer) def remove_observer(self, observer): """Remove a previously registered observer from all event types. """ for mode in self.observers.pop(observer, ()): self.observer_modes[mode].remove(observer) def squelch_observer_exception(self, observer, exc_info): traceback.print_exception(*exc_info) print("Removing observer: %r" % (observer,), file=sys.stderr) self.remove_observer(observer) def fire_observers(self, activity): for observer in self.observer_modes[activity]: try: observer(self, activity) except self.SYSTEM_EXCEPTIONS: raise except: self.squelch_observer_exception(observer, sys.exc_info()) def squelch_timer_exception(self, timer, exc_info): traceback.print_exception(*exc_info) print("Timer raised: %r" % (timer,), file=sys.stderr) def _add_absolute_timer(self, when, info): # the 0 placeholder makes it easy to bisect_right using (now, 1) self.next_timers.append((when, 0, info)) def add_timer(self, timer): scheduled_time = self.clock() + timer.seconds self._add_absolute_timer(scheduled_time, timer) return scheduled_time def timer_finished(self, timer): pass def timer_canceled(self, timer): self.timer_finished(timer) def prepare_timers(self): ins = bisect.insort_right t = self.timers for item in self.next_timers: ins(t, item) del self.next_timers[:] def schedule_call_local(self, seconds, cb, *args, **kw): """Schedule a callable to be called after 'seconds' seconds have elapsed. Cancel the timer if greenlet has exited. seconds: The number of seconds to wait. cb: The callable to call after the given time. *args: Arguments to pass to the callable when called. **kw: Keyword arguments to pass to the callable when called. """ t = LocalTimer(seconds, cb, *args, **kw) self.add_timer(t) return t schedule_call = schedule_call_local def schedule_call_global(self, seconds, cb, *args, **kw): """Schedule a callable to be called after 'seconds' seconds have elapsed. The timer will NOT be cancelled if the current greenlet has exited before the timer fires. seconds: The number of seconds to wait. cb: The callable to call after the given time. *args: Arguments to pass to the callable when called. **kw: Keyword arguments to pass to the callable when called. """ t = Timer(seconds, cb, *args, **kw) self.add_timer(t) return t def fire_timers(self, when): t = self.timers last = bisect.bisect_right(t, (when, 1)) i = 0 for i in range(last): timer = t[i][2] try: try: timer() except self.SYSTEM_EXCEPTIONS: raise except: self.squelch_timer_exception(timer, sys.exc_info()) finally: self.timer_finished(timer) del t[:last] # for debugging: def get_readers(self): return self.readers def get_writers(self): return self.writers def get_excs(self): return self.excs def get_timers_count(hub): return max(len(x) for x in [hub.timers, hub.next_timers]) diff --git a/eventlib/proc.py b/eventlib/proc.py index e725109..d6c4e36 100644 --- a/eventlib/proc.py +++ b/eventlib/proc.py @@ -1,739 +1,739 @@ # Copyright (c) 2008-2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. """Advanced coroutine control. This module provides means to spawn, kill and link coroutines. Linking means subscribing to the coroutine's result, either in form of return value or unhandled exception. To create a linkable coroutine use spawn function provided by this module: >>> def demofunc(x, y): ... return x / y >>> p = spawn(demofunc, 6, 2) The return value of spawn is an instance of Proc class that you can "link": * p.link(obj) - notify obj when the coroutine is finished What does "notify" means here depends on the type of `obj': a callable is simply called, an event or a queue is notified using send/send_exception methods and if `obj' is another greenlet it's killed with LinkedExited exception. Here's an example: >>> event = coros.event() >>> _ = p.link(event) >>> event.wait() 3 Now, even though `p' is finished it's still possible to link it. In this case the notification is performed immediatelly: >>> try: ... p.link() ... except LinkedCompleted: ... print 'LinkedCompleted' LinkedCompleted (Without an argument, link is created to the current greenlet) There are also link_value and link_exception methods that only deliver a return value and an unhandled exception respectively (plain `link' deliver both). Suppose we want to spawn a greenlet to do an important part of the task; if it fails then there's no way to complete the task so the parent must fail as well; `link_exception' is useful here: >>> p = spawn(demofunc, 1, 0) >>> _ = p.link_exception() >>> try: ... api.sleep(1) ... except LinkedFailed: ... print 'LinkedFailed' LinkedFailed One application of linking is `waitall' function: link to a bunch of coroutines and wait for all them to complete. Such function is provided by this module. """ import sys from eventlib import api, coros __all__ = ['LinkedExited', 'LinkedFailed', 'LinkedCompleted', 'LinkedKilled', 'ProcExit', 'waitall', - 'killall' + 'killall', 'Source', 'Proc', 'spawn', 'spawn_link', 'spawn_link_value', 'spawn_link_exception'] class LinkedExited(Exception): """Raised when a linked proc exits""" msg = "%r exited" def __init__(self, name=None, msg=None): self.name = name if msg is None: msg = self.msg % self.name Exception.__init__(self, msg) class LinkedFailed(LinkedExited): """Raised when a linked proc dies because of unhandled exception""" msg = "%r failed with %s" def __init__(self, name, typ, value=None, tb=None): msg = self.msg % (name, typ.__name__) LinkedExited.__init__(self, name, msg) class LinkedCompleted(LinkedExited): """Raised when a linked proc finishes the execution cleanly""" msg = "%r completed successfully" class LinkedKilled(LinkedFailed): """Raised when a linked proc dies because of unhandled GreenletExit (i.e. it was killed) """ msg = """%r was killed with %s""" def getLinkedFailed(name, typ, value=None, tb=None): if issubclass(typ, api.GreenletExit): return LinkedKilled(name, typ, value, tb) return LinkedFailed(name, typ, value, tb) class ProcExit(api.GreenletExit): """Raised when this proc is killed.""" class Link(object): def __init__(self, listener): self.listener = listener def cancel(self): self.listener = None def __enter__(self): pass def __exit__(self, *args): self.cancel() class LinkToEvent(Link): def __call__(self, source): if self.listener is None: return if source.has_value(): self.listener.send(source.value) else: self.listener.send_exception(*source.exc_info()) class LinkToGreenlet(Link): def __call__(self, source): if source.has_value(): self.listener.throw(LinkedCompleted(source.name)) else: self.listener.throw(getLinkedFailed(source.name, *source.exc_info())) class LinkToCallable(Link): def __call__(self, source): self.listener(source) def waitall(lst, trap_errors=False, queue=None): if queue is None: queue = coros.queue() index = -1 for (index, linkable) in enumerate(lst): linkable.link(decorate_send(queue, index)) len = index + 1 results = [None] * len count = 0 while count < len: try: index, value = queue.wait() except Exception: if not trap_errors: raise else: results[index] = value count += 1 return results class decorate_send(object): def __init__(self, event, tag): self._event = event self._tag = tag def __repr__(self): params = (type(self).__name__, self._tag, self._event) return '<%s tag=%r event=%r>' % params def __getattr__(self, name): assert name != '_event' return getattr(self._event, name) def send(self, value): self._event.send((self._tag, value)) def killall(procs, *throw_args, **kwargs): if not throw_args: throw_args = (ProcExit, ) wait = kwargs.pop('wait', False) if kwargs: raise TypeError('Invalid keyword argument for proc.killall(): %s' % ', '.join(list(kwargs.keys()))) for g in procs: if not g.dead: api.get_hub().schedule_call_global(0, g.throw, *throw_args) if wait and api.getcurrent() is not api.get_hub().greenlet: api.sleep(0) class NotUsed(object): def __str__(self): return '' __repr__ = __str__ _NOT_USED = NotUsed() def spawn_greenlet(function, *args): """Create a new greenlet that will run `function(*args)'. The current greenlet won't be unscheduled. Keyword arguments aren't supported (limitation of greenlet), use spawn() to work around that. """ g = api.Greenlet(function) g.parent = api.get_hub().greenlet api.get_hub().schedule_call_global(0, g.switch, *args) return g class Source(object): """Maintain a set of links to the listeners. Delegate the sent value or the exception to all of them. To set up a link, use link_value, link_exception or link method. The latter establishes both "value" and "exception" link. It is possible to link to events, queues, greenlets and callables. >>> source = Source() >>> event = coros.event() >>> _ = source.link(event) Once source's send or send_exception method is called, all the listeners with the right type of link will be notified ("right type" means that exceptions won't be delivered to "value" links and values won't be delivered to "exception" links). Once link has been fired it is removed. Notifying listeners is performed in the MAINLOOP greenlet. Under the hood notifying a link means executing a callback, see Link class for details. Notification must not attempt to switch to the hub, i.e. call any of blocking functions. >>> source.send('hello') >>> event.wait() 'hello' Any error happened while sending will be logged as a regular unhandled exception. This won't prevent other links from being fired. There 3 kinds of listeners supported: 1. If `listener' is a greenlet (regardless if it's a raw greenlet or an extension like Proc), a subclass of LinkedExited exception is raised in it. 2. If `listener' is something with send/send_exception methods (event, queue, Source but not Proc) the relevant method is called. 3. If `listener' is a callable, it is called with 1 argument (the result) for "value" links and with 3 arguments (typ, value, tb) for "exception" links. """ def __init__(self, name=None): self.name = name self._value_links = {} self._exception_links = {} self.value = _NOT_USED self._exc = None def _repr_helper(self): result = [] result.append(repr(self.name)) if self.value is not _NOT_USED: if self._exc is None: res = repr(self.value) if len(res)>50: res = res[:50]+'...' result.append('result=%s' % res) else: result.append('raised=%s' % (self._exc, )) result.append('{%s:%s}' % (len(self._value_links), len(self._exception_links))) return result def __repr__(self): klass = type(self).__name__ return '<%s at %s %s>' % (klass, hex(id(self)), ' '.join(self._repr_helper())) def ready(self): return self.value is not _NOT_USED def has_value(self): return self.value is not _NOT_USED and self._exc is None def has_exception(self): return self.value is not _NOT_USED and self._exc is not None def exc_info(self): if not self._exc: return (None, None, None) elif len(self._exc)==3: return self._exc elif len(self._exc)==1: if isinstance(self._exc[0], type): return self._exc[0], None, None else: return self._exc[0].__class__, self._exc[0], None elif len(self._exc)==2: return self._exc[0], self._exc[1], None else: return self._exc def link_value(self, listener=None, link=None): if self.ready() and self._exc is not None: return if listener is None: listener = api.getcurrent() if link is None: link = self.getLink(listener) if self.ready() and listener is api.getcurrent(): link(self) else: self._value_links[listener] = link if self.value is not _NOT_USED: self._start_send() return link def link_exception(self, listener=None, link=None): if self.value is not _NOT_USED and self._exc is None: return if listener is None: listener = api.getcurrent() if link is None: link = self.getLink(listener) if self.ready() and listener is api.getcurrent(): link(self) else: self._exception_links[listener] = link if self.value is not _NOT_USED: self._start_send_exception() return link def link(self, listener=None, link=None): if listener is None: listener = api.getcurrent() if link is None: link = self.getLink(listener) if self.ready() and listener is api.getcurrent(): if self._exc is None: link(self) else: link(self) else: self._value_links[listener] = link self._exception_links[listener] = link if self.value is not _NOT_USED: if self._exc is None: self._start_send() else: self._start_send_exception() return link def unlink(self, listener=None): if listener is None: listener = api.getcurrent() self._value_links.pop(listener, None) self._exception_links.pop(listener, None) @staticmethod def getLink(listener): if hasattr(listener, 'throw'): return LinkToGreenlet(listener) if hasattr(listener, 'send'): return LinkToEvent(listener) elif callable(listener): return LinkToCallable(listener) else: raise TypeError("Don't know how to link to %r" % (listener, )) def send(self, value): assert not self.ready(), "%s has been fired already" % self self.value = value self._exc = None self._start_send() def _start_send(self): api.get_hub().schedule_call_global(0, self._do_send, list(self._value_links.items()), self._value_links) def send_exception(self, *throw_args): assert not self.ready(), "%s has been fired already" % self self.value = None self._exc = throw_args self._start_send_exception() def _start_send_exception(self): api.get_hub().schedule_call_global(0, self._do_send, list(self._exception_links.items()), self._exception_links) def _do_send(self, links, consult): while links: listener, link = links.pop() try: if listener in consult: try: link(self) finally: consult.pop(listener, None) except: api.get_hub().schedule_call_global(0, self._do_send, links, consult) raise def wait(self, timeout=None, *throw_args): """Wait until send() or send_exception() is called or `timeout' has expired. Return the argument of send or raise the argument of send_exception. If timeout has expired, None is returned. The arguments, when provided, specify how many seconds to wait and what to do when timeout has expired. They are treated the same way as api.timeout treats them. """ if self.value is not _NOT_USED: if self._exc is None: return self.value else: api.getcurrent().throw(*self._exc) if timeout is not None: timer = api.timeout(timeout, *throw_args) timer.__enter__() if timeout==0: if timer.__exit__(None, None, None): return else: try: api.getcurrent().throw(*timer.throw_args) except: if not timer.__exit__(*sys.exc_info()): raise return EXC = True try: try: waiter = Waiter() self.link(waiter) try: return waiter.wait() finally: self.unlink(waiter) except: EXC = False if timeout is None or not timer.__exit__(*sys.exc_info()): raise finally: if timeout is not None and EXC: timer.__exit__(None, None, None) class Waiter(object): def __init__(self): self.greenlet = None def send(self, value): """Wake up the greenlet that is calling wait() currently (if there is one). Can only be called from get_hub().greenlet. """ assert api.getcurrent() is api.get_hub().greenlet if self.greenlet is not None: self.greenlet.switch(value) def send_exception(self, *throw_args): """Make greenlet calling wait() wake up (if there is a wait()). Can only be called from get_hub().greenlet. """ assert api.getcurrent() is api.get_hub().greenlet if self.greenlet is not None: self.greenlet.throw(*throw_args) def wait(self): """Wait until send or send_exception is called. Return value passed into send() or raise exception passed into send_exception(). """ assert self.greenlet is None current = api.getcurrent() assert current is not api.get_hub().greenlet self.greenlet = current try: return api.get_hub().switch() finally: self.greenlet = None class Proc(Source): """A linkable coroutine based on Source. Upon completion, delivers coroutine's result to the listeners. """ def __init__(self, name=None): self.greenlet = None Source.__init__(self, name) def _repr_helper(self): if self.greenlet is not None and self.greenlet.dead: dead = '(dead)' else: dead = '' return ['%r%s' % (self.greenlet, dead)] + Source._repr_helper(self) def __repr__(self): klass = type(self).__name__ return '<%s %s>' % (klass, ' '.join(self._repr_helper())) def __bool__(self): if self.ready(): # with current _run this does not makes any difference # still, let keep it there return False # otherwise bool(proc) is the same as bool(greenlet) if self.greenlet is not None: return bool(self.greenlet) @property def dead(self): return self.ready() or self.greenlet.dead @classmethod def spawn(cls, function, *args, **kwargs): """Return a new Proc instance that is scheduled to execute function(*args, **kwargs) upon the next hub iteration. """ proc = cls() proc.run(function, *args, **kwargs) return proc def run(self, function, *args, **kwargs): """Create a new greenlet to execute `function(*args, **kwargs)'. The created greenlet is scheduled to run upon the next hub iteration. """ assert self.greenlet is None, "'run' can only be called once per instance" if self.name is None: self.name = str(function) self.greenlet = spawn_greenlet(self._run, function, args, kwargs) def _run(self, function, args, kwargs): """Internal top level function. Execute *function* and send its result to the listeners. """ try: result = function(*args, **kwargs) except api.GreenletExit as e: self.send_exception(e) raise except: self.send_exception(*sys.exc_info()) raise # let mainloop log the exception else: self.send(result) def throw(self, *throw_args): """Used internally to raise the exception. Behaves exactly like greenlet's 'throw' with the exception that ProcExit is raised by default. Do not use this function as it leaves the current greenlet unscheduled forever. Use kill() method instead. """ if not self.dead: if not throw_args: throw_args = (ProcExit, ) self.greenlet.throw(*throw_args) def kill(self, *throw_args): """Raise an exception in the greenlet. Unschedule the current greenlet so that this Proc can handle the exception (or die). The exception can be specified with throw_args. By default, ProcExit is raised. """ if not self.dead: if not throw_args: throw_args = (ProcExit, ) api.get_hub().schedule_call_global(0, self.greenlet.throw, *throw_args) if api.getcurrent() is not api.get_hub().greenlet: api.sleep(0) # QQQ maybe Proc should not inherit from Source (because its send() and send_exception() # QQQ methods are for internal use only) spawn = Proc.spawn def spawn_link(function, *args, **kwargs): p = spawn(function, *args, **kwargs) p.link() return p def spawn_link_value(function, *args, **kwargs): p = spawn(function, *args, **kwargs) p.link_value() return p def spawn_link_exception(function, *args, **kwargs): p = spawn(function, *args, **kwargs) p.link_exception() return p def trap_errors(errors, func, *args, **kwargs): """DEPRECATED in favor of wrap_errors""" try: return func(*args, **kwargs) except errors as ex: return ex class wrap_errors(object): """Helper to make function return an exception, rather than raise it. Because every exception that is unhandled by greenlet will be logged by the hub, it is desirable to prevent non-error exceptions from leaving a greenlet. This can done with simple try/except construct: def func1(*args, **kwargs): try: return func(*args, **kwargs) except (A, B, C), ex: return ex wrap_errors provides a shortcut to write that in one line: func1 = wrap_errors((A, B, C), func) It also preserves __str__ and __repr__ of the original function. """ def __init__(self, errors, func): """Make a new function from `func', such that it catches `errors' (an Exception subclass, or a tuple of Exception subclasses) and return it as a value. """ self.errors = errors self.func = func def __call__(self, *args, **kwargs): try: return self.func(*args, **kwargs) except self.errors as ex: return ex def __str__(self): return str(self.func) def __repr__(self): return repr(self.func) def __getattr__(self, item): return getattr(self.func, item) class RunningProcSet(object): """Maintain a set of Procs that are still running, that is, automatically remove a proc when it's finished. Provide a way to wait/kill all of them""" def __init__(self, *args): self.procs = set(*args) if args: for p in self.args[0]: p.link(lambda p: self.procs.discard(p)) def __len__(self): return len(self.procs) def __contains__(self, item): if isinstance(item, api.Greenlet): # special case for "api.getcurrent() in running_proc_set" to work for x in self.procs: if x.greenlet == item: return True else: return item in self.procs def __iter__(self): return iter(self.procs) def add(self, p): self.procs.add(p) p.link(lambda p: self.procs.discard(p)) def spawn(self, func, *args, **kwargs): p = spawn(func, *args, **kwargs) self.add(p) return p def waitall(self, trap_errors=True): while self.procs: waitall(self.procs, trap_errors=trap_errors) def killall(self, *throw_args, **kwargs): return killall(self.procs, *throw_args, **kwargs) class Pool(object): linkable_class = Proc def __init__(self, limit): self.semaphore = coros.Semaphore(limit) def allocate(self): self.semaphore.acquire() g = self.linkable_class() g.link(lambda *_args: self.semaphore.release()) return g if __name__=='__main__': import doctest doctest.testmod() diff --git a/eventlib/tpool.py b/eventlib/tpool.py index 8ee2b4a..d32a3cd 100644 --- a/eventlib/tpool.py +++ b/eventlib/tpool.py @@ -1,186 +1,188 @@ # Copyright (c) 2007, Linden Research, Inc. # Copyright (c) 2007, IBM Corp. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import threading -from queue import Empty, Queue +from queue import Empty +from queue import _PySimpleQueue as SimpleQueue from eventlib import api, coros, greenio -QUIET=False +QUIET=True _rpipe, _wpipe = os.pipe() -_rfile = os.fdopen(_rpipe,"r",0) +_rfile = os.fdopen(_rpipe,"rb",0) ## Work whether or not wrap_pipe_with_coroutine_pipe was called if not isinstance(_rfile, greenio.GreenPipe): _rfile = greenio.GreenPipe(_rfile) def _signal_t2e(): from eventlib import util - nwritten = util.__original_write__(_wpipe, ' ') + nwritten = util.__original_write__(_wpipe, b' ') -_reqq = Queue(maxsize=-1) -_rspq = Queue(maxsize=-1) +_reqq = SimpleQueue() +_rspq = SimpleQueue() def tpool_trampoline(): + from eventlib import util global _reqq, _rspq while(True): _c = _rfile.recv(1) - assert(_c != "") + assert(_c != b'') while not _rspq.empty(): try: - (e,rv) = _rspq.get(block=False) + (e,rv) = _rspq.get() e.send(rv) rv = None except Empty: pass def esend(meth,*args, **kwargs): global _reqq, _rspq e = coros.event() _reqq.put((e,meth,args,kwargs)) return e SYS_EXCS = (KeyboardInterrupt, SystemExit) def tworker(): global _reqq, _rspq while(True): msg = _reqq.get() if msg is None: return (e,meth,args,kwargs) = msg rv = None try: rv = meth(*args,**kwargs) except SYS_EXCS: raise except Exception as exn: import sys (a,b,tb) = sys.exc_info() rv = (exn,a,b,tb) _rspq.put((e,rv)) meth = args = kwargs = e = rv = None _signal_t2e() def erecv(e): rv = e.wait() if isinstance(rv,tuple) and len(rv) == 4 and isinstance(rv[0],Exception): import traceback (e,a,b,tb) = rv if not QUIET: traceback.print_exception(Exception,e,tb) traceback.print_stack() raise e return rv def execute(meth,*args, **kwargs): """Execute method in a thread, blocking the current coroutine until the method completes. """ e = esend(meth,*args,**kwargs) rv = erecv(e) return rv ## TODO deprecate erpc = execute def proxy_call(autowrap, f, *args, **kwargs): """ Call a function *f* and returns the value. If the type of the return value is in the *autowrap* collection, then it is wrapped in a Proxy object before return. Normally *f* will be called nonblocking with the execute method; if the keyword argument "nonblocking" is set to true, it will simply be executed directly.""" if kwargs.pop('nonblocking',False): rv = f(*args, **kwargs) else: rv = execute(f,*args,**kwargs) if isinstance(rv, autowrap): return Proxy(rv, autowrap) else: return rv class Proxy(object): """ a simple proxy-wrapper of any object that comes with a methods-only interface, in order to forward every method invocation onto a thread in the native-thread pool. A key restriction is that the object's methods cannot call into eventlibs, since the eventlib dispatcher runs on a different native thread. This is for running native-threaded code only. """ def __init__(self, obj,autowrap=()): self._obj = obj self._autowrap = autowrap def __getattr__(self,attr_name): f = getattr(self._obj,attr_name) if not callable(f): return f def doit(*args, **kwargs): return proxy_call(self._autowrap, f, *args, **kwargs) return doit # the following are a buncha methods that the python interpeter # doesn't use getattr to retrieve and therefore have to be defined # explicitly def __getitem__(self, key): return proxy_call(self._autowrap, self._obj.__getitem__, key) def __setitem__(self, key, value): return proxy_call(self._autowrap, self._obj.__setitem__, key, value) def __deepcopy__(self, memo=None): return proxy_call(self._autowrap, self._obj.__deepcopy__, memo) def __copy__(self, memo=None): return proxy_call(self._autowrap, self._obj.__copy__, memo) # these don't go through a proxy call, because they're likely to # be called often, and are unlikely to be implemented on the # wrapped object in such a way that they would block def __eq__(self, rhs): return self._obj.__eq__(rhs) def __repr__(self): return self._obj.__repr__() def __str__(self): return self._obj.__str__() def __len__(self): return len(self._obj) def __bool__(self): return bool(self._obj) _nthreads = int(os.environ.get('EVENTLIB_THREADPOOL_SIZE', 20)) _threads = {} def setup(): global _threads for i in range(0,_nthreads): _threads[i] = threading.Thread(target=tworker) _threads[i].setDaemon(True) _threads[i].start() api.spawn(tpool_trampoline) setup() def killall(): for i in _threads: _reqq.put(None) for thr in list(_threads.values()): thr.join() diff --git a/eventlib/twistedutil/protocol.py b/eventlib/twistedutil/protocol.py index 1ee2c15..e24b618 100644 --- a/eventlib/twistedutil/protocol.py +++ b/eventlib/twistedutil/protocol.py @@ -1,438 +1,440 @@ # Copyright (c) 2008-2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. """Basic twisted protocols converted to synchronous mode""" import sys from twisted.internet.protocol import Protocol as twistedProtocol from twisted.internet.error import ConnectionDone from twisted.internet.protocol import Factory, ClientFactory from twisted.internet import main from twisted.python import failure from eventlib import proc from eventlib.api import getcurrent -from eventlib.coros import queue, event +from eventlib.coros import queue, event, NOT_USED class ValueQueue(queue): """Queue that keeps the last item forever in the queue if it's an exception. Useful if you send an exception over queue only once, and once sent it must be always available. """ def send(self, value=None, exc=None): if exc is not None or not self.has_error(): queue.send(self, value, exc) def wait(self): """The difference from queue.wait: if there is an only item in the queue and it is an exception, raise it, but keep it in the queue, so that future calls to wait() will raise it again. """ self.sem.acquire() if self.has_error() and len(self.items)==1: # the last item, which is an exception, raise without emptying the queue self.sem.release() getcurrent().throw(*self.items[0][1]) else: result, exc = self.items.popleft() if exc is not None: getcurrent().throw(*exc) return result def has_error(self): return self.items and self.items[-1][1] is not None class Event(event): def send(self, value, exc=None): if self.ready(): self.reset() return event.send(self, value, exc) def send_exception(self, *throw_args): if self.ready(): self.reset() return event.send_exception(self, *throw_args) class Producer2Event(object): # implements IPullProducer def __init__(self, event): self.event = event def resumeProducing(self): self.event.send(1) def stopProducing(self): del self.event class GreenTransportBase(object): transportBufferSize = None def __init__(self, transportBufferSize=None): if transportBufferSize is not None: self.transportBufferSize = transportBufferSize self._queue = ValueQueue() self._write_event = Event() self._disconnected_event = Event() def build_protocol(self): return self.protocol_class(self) def _got_transport(self, transport): self._queue.send(transport) def _got_data(self, data): self._queue.send(data) def _connectionLost(self, reason): self._disconnected_event.send(reason.value) self._queue.send_exception(reason.value) self._write_event.send_exception(reason.value) def _wait(self): if self.disconnecting or self._disconnected_event.ready(): if self._queue: return self._queue.wait() else: raise self._disconnected_event.wait() self.resumeProducing() try: return self._queue.wait() finally: self.pauseProducing() def write(self, data, wait=True): + if not isinstance(data, bytes): + data = data.encode() if self._disconnected_event.ready(): raise self._disconnected_event.wait() - if wait: + if wait: # and self._write_event._result is not NOT_USED: self._write_event.reset() self.transport.write(data) self._write_event.wait() else: self.transport.write(data) def loseConnection(self, wait=True): self.transport.unregisterProducer() self.transport.loseConnection() if wait: self._disconnected_event.wait() def __getattr__(self, item): if item=='transport': raise AttributeError(item) if hasattr(self, 'transport'): try: return getattr(self.transport, item) except AttributeError: me = type(self).__name__ trans = type(self.transport).__name__ raise AttributeError("Neither %r nor %r has attribute %r" % (me, trans, item)) else: raise AttributeError(item) def resumeProducing(self): self.paused -= 1 if self.paused==0: self.transport.resumeProducing() def pauseProducing(self): self.paused += 1 if self.paused==1: self.transport.pauseProducing() def _init_transport_producer(self): self.transport.pauseProducing() self.paused = 1 def _init_transport(self): transport = self._queue.wait() self.transport = transport if self.transportBufferSize is not None: transport.bufferSize = self.transportBufferSize self._init_transport_producer() transport.registerProducer(Producer2Event(self._write_event), False) class Protocol(twistedProtocol): def __init__(self, recepient): self._recepient = recepient def connectionMade(self): self._recepient._got_transport(self.transport) def dataReceived(self, data): self._recepient._got_data(data) def connectionLost(self, reason): self._recepient._connectionLost(reason) class UnbufferedTransport(GreenTransportBase): """A very simple implementation of a green transport without an additional buffer""" protocol_class = Protocol def recv(self): """Receive a single chunk of undefined size. Return '' if connection was closed cleanly, raise the exception if it was closed in a non clean fashion. After that all successive calls return ''. """ if self._disconnected_event.ready(): return '' try: return self._wait() except ConnectionDone: return '' def read(self): """Read the data from the socket until the connection is closed cleanly. If connection was closed in a non-clean fashion, the appropriate exception is raised. In that case already received data is lost. Next time read() is called it returns ''. """ result = '' while True: recvd = self.recv() if not recvd: break result += recvd return result # iterator protocol: def __iter__(self): return self def __next__(self): result = self.recv() if not result: raise StopIteration return result class GreenTransport(GreenTransportBase): protocol_class = Protocol - _buffer = '' + _buffer = b'' _error = None def read(self, size=-1): """Read size bytes or until EOF""" if not self._disconnected_event.ready(): try: while len(self._buffer) < size or size < 0: self._buffer += self._wait() except ConnectionDone: pass except: if not self._disconnected_event.has_exception(): raise if size>=0: result, self._buffer = self._buffer[:size], self._buffer[size:] else: - result, self._buffer = self._buffer, '' + result, self._buffer = self._buffer, b'' if not result and self._disconnected_event.has_exception(): try: self._disconnected_event.wait() except ConnectionDone: pass return result def recv(self, buflen=None): """Receive a single chunk of undefined size but no bigger than buflen""" if not self._disconnected_event.ready(): self.resumeProducing() try: try: recvd = self._wait() #print 'received %r' % recvd self._buffer += recvd except ConnectionDone: pass except: if not self._disconnected_event.has_exception(): raise finally: self.pauseProducing() if buflen is None: - result, self._buffer = self._buffer, '' + result, self._buffer = self._buffer, b'' else: result, self._buffer = self._buffer[:buflen], self._buffer[buflen:] if not result and self._disconnected_event.has_exception(): try: self._disconnected_event.wait() except ConnectionDone: pass return result # iterator protocol: def __iter__(self): return self def __next__(self): res = self.recv() if not res: raise StopIteration return res class GreenInstanceFactory(ClientFactory): noisy = False def __init__(self, instance, event): self.instance = instance self.event = event def buildProtocol(self, addr): return self.instance def clientConnectionFailed(self, connector, reason): self.event.send_exception(reason.type, reason.value, reason.tb) class GreenClientCreator(object): """Connect to a remote host and return a connected green transport instance. """ gtransport_class = GreenTransport def __init__(self, reactor=None, gtransport_class=None, *args, **kwargs): if reactor is None: from twisted.internet import reactor self.reactor = reactor if gtransport_class is not None: self.gtransport_class = gtransport_class self.args = args self.kwargs = kwargs def _make_transport_and_factory(self): gtransport = self.gtransport_class(*self.args, **self.kwargs) protocol = gtransport.build_protocol() factory = GreenInstanceFactory(protocol, gtransport._queue) return gtransport, factory def connectTCP(self, host, port, *args, **kwargs): gtransport, factory = self._make_transport_and_factory() self.reactor.connectTCP(host, port, factory, *args, **kwargs) gtransport._init_transport() return gtransport def connectSSL(self, host, port, *args, **kwargs): gtransport, factory = self._make_transport_and_factory() self.reactor.connectSSL(host, port, factory, *args, **kwargs) gtransport._init_transport() return gtransport def connectTLS(self, host, port, *args, **kwargs): gtransport, factory = self._make_transport_and_factory() self.reactor.connectTLS(host, port, factory, *args, **kwargs) gtransport._init_transport() return gtransport def connectUNIX(self, address, *args, **kwargs): gtransport, factory = self._make_transport_and_factory() self.reactor.connectUNIX(address, factory, *args, **kwargs) gtransport._init_transport() return gtransport def connectSRV(self, service, domain, *args, **kwargs): SRVConnector = kwargs.pop('ConnectorClass', None) if SRVConnector is None: from twisted.names.srvconnect import SRVConnector gtransport, factory = self._make_transport_and_factory() c = SRVConnector(self.reactor, service, domain, factory, *args, **kwargs) c.connect() gtransport._init_transport() return gtransport class SimpleSpawnFactory(Factory): """Factory that spawns a new greenlet for each incoming connection. For an incoming connection a new greenlet is created using the provided callback as a function and a connected green transport instance as an argument. """ noisy = False gtransport_class = GreenTransport def __init__(self, handler, gtransport_class=None, *args, **kwargs): if callable(handler): self.handler = handler else: self.handler = handler.send if hasattr(handler, 'send_exception'): self.exc_handler = handler.send_exception if gtransport_class is not None: self.gtransport_class = gtransport_class self.args = args self.kwargs = kwargs def exc_handler(self, *args): pass def buildProtocol(self, addr): gtransport = self.gtransport_class(*self.args, **self.kwargs) protocol = gtransport.build_protocol() protocol.factory = self self._do_spawn(gtransport, protocol) return protocol def _do_spawn(self, gtransport, protocol): proc.spawn_greenlet(self._run_handler, gtransport, protocol) def _run_handler(self, gtransport, protocol): try: gtransport._init_transport() except Exception: self.exc_handler(*sys.exc_info()) else: self.handler(gtransport) class SpawnFactory(SimpleSpawnFactory): """An extension to SimpleSpawnFactory that provides some control over the greenlets it has spawned. """ def __init__(self, handler, gtransport_class=None, *args, **kwargs): self.greenlets = set() SimpleSpawnFactory.__init__(self, handler, gtransport_class, *args, **kwargs) def _do_spawn(self, gtransport, protocol): g = proc.spawn(self._run_handler, gtransport, protocol) self.greenlets.add(g) g.link(lambda *_: self.greenlets.remove(g)) def waitall(self): return proc.waitall(self.greenlets) diff --git a/eventlib/util.py b/eventlib/util.py index c79797e..d230ad3 100644 --- a/eventlib/util.py +++ b/eventlib/util.py @@ -1,175 +1,183 @@ # @author Bob Ippolito # # Copyright (c) 2005-2006, Bob Ippolito # Copyright (c) 2007, Linden Research, Inc. # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import os import select import socket import errno import sys +import ssl def g_log(*args): import sys from eventlib.support import greenlets as greenlet g_id = id(greenlet.getcurrent()) if g_id is None: if greenlet.getcurrent().parent is None: ident = 'greenlet-main' else: g_id = id(greenlet.getcurrent()) if g_id < 0: g_id += 1 + ((sys.maxsize + 1) << 1) ident = '%08X' % (g_id,) else: ident = 'greenlet-%d' % (g_id,) print('[%s] %s' % (ident, ' '.join(map(str, args))), file=sys.stderr) __original_socket__ = socket.socket __original_gethostbyname__ = socket.gethostbyname __original_getaddrinfo__ = socket.getaddrinfo if sys.platform != 'win32': __original_fromfd__ = socket.fromfd def tcp_socket(): s = __original_socket__(socket.AF_INET, socket.SOCK_STREAM) return s __original_fdopen__ = os.fdopen __original_read__ = os.read __original_write__ = os.write __original_waitpid__ = os.waitpid if sys.platform != 'win32': __original_fork__ = os.fork ## TODO wrappings for popen functions? not really needed since Process object exists? pipes_already_wrapped = False def wrap_pipes_with_coroutine_pipes(): from eventlib import processes ## Make sure the signal handler is installed global pipes_already_wrapped if pipes_already_wrapped: return def new_fdopen(*args, **kw): from eventlib import greenio return greenio.GreenPipe(__original_fdopen__(*args, **kw)) def new_read(fd, *args, **kw): from eventlib import api try: api.trampoline(fd, read=True) except socket.error as e: if e[0] == errno.EPIPE: return '' else: raise return __original_read__(fd, *args, **kw) def new_write(fd, *args, **kw): from eventlib import api api.trampoline(fd, write=True) return __original_write__(fd, *args, **kw) if sys.platform != 'win32': def new_fork(*args, **kwargs): pid = __original_fork__() if pid: processes._add_child_pid(pid) return pid os.fork = new_fork def new_waitpid(pid, options): from eventlib import processes evt = processes.CHILD_EVENTS.get(pid) if not evt: return 0, 0 if options == os.WNOHANG: if evt.ready(): return pid, evt.wait() return 0, 0 elif options: return __original_waitpid__(pid, options) return pid, evt.wait() os.fdopen = new_fdopen os.read = new_read os.write = new_write os.waitpid = new_waitpid __original_select__ = select.select try: import threading __original_threadlocal__ = threading.local except ImportError: pass +def wrap_ssl(sock, certificate=None, private_key=None): + return ssl.wrap_socket(sock, + keyfile=private_key, certfile=certificate, + server_side=False, cert_reqs=ssl.CERT_NONE, + ssl_version=ssl.PROTOCOL_SSLv23, ca_certs=None, + do_handshake_on_connect=False, + suppress_ragged_eofs=True) def wrap_threading_local_with_coro_local(): """monkey patch threading.local with something that is greenlet aware. Since greenlets cannot cross threads, so this should be semantically identical to threadlocal.local """ from eventlib import api def get_ident(): return id(api.getcurrent()) class local(object): def __init__(self): self.__dict__['__objs'] = {} def __getattr__(self, attr, g=get_ident): try: return self.__dict__['__objs'][g()][attr] except KeyError: raise AttributeError( "No variable %s defined for the thread %s" % (attr, g())) def __setattr__(self, attr, value, g=get_ident): self.__dict__['__objs'].setdefault(g(), {})[attr] = value def __delattr__(self, attr, g=get_ident): try: del self.__dict__['__objs'][g()][attr] except KeyError: raise AttributeError( "No variable %s defined for thread %s" % (attr, g())) threading.local = local def socket_bind_and_listen(sock, addr=('', 0), backlog=50): set_reuse_addr(sock) sock.bind(addr) sock.listen(backlog) return sock def set_reuse_addr(sock): try: sock.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1, ) except socket.error: pass diff --git a/eventlib/wsgi.py b/eventlib/wsgi.py index 83dbca2..108cd3c 100644 --- a/eventlib/wsgi.py +++ b/eventlib/wsgi.py @@ -1,413 +1,414 @@ # @author Bob Ippolito # # Copyright (c) 2005-2006, Bob Ippolito # Copyright (c) 2007, Linden Research, Inc. # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import errno import os import sys import time import traceback +import urllib.parse as urllib_parse from eventlib.green import urllib from eventlib.green import socket from eventlib.green import BaseHTTPServer from eventlib import api from eventlib.pool import Pool DEFAULT_MAX_SIMULTANEOUS_REQUESTS = 1024 DEFAULT_MAX_HTTP_VERSION = 'HTTP/1.1' # Weekday and month names for HTTP date/time formatting; always English! _weekdayname = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"] _monthname = [None, # Dummy so we can use 1-based month numbers "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] def format_date_time(timestamp): year, month, day, hh, mm, ss, wd, y, z = time.gmtime(timestamp) return "%s, %02d %3s %4d %02d:%02d:%02d GMT" % ( _weekdayname[wd], day, _monthname[month], year, hh, mm, ss ) class Input(object): def __init__(self, rfile, content_length, wfile=None, wfile_line=None): self.rfile = rfile if content_length is not None: content_length = int(content_length) self.content_length = content_length self.wfile = wfile self.wfile_line = wfile_line self.position = 0 def _do_read(self, reader, length=None): if self.wfile is not None: ## 100 Continue self.wfile.write(self.wfile_line) self.wfile = None self.wfile_line = None if length is None and self.content_length is not None: length = self.content_length - self.position if length and length > self.content_length - self.position: length = self.content_length - self.position if not length: return '' read = reader(length) self.position += len(read) return read def read(self, length=None): return self._do_read(self.rfile.read, length) def readline(self): return self._do_read(self.rfile.readline) def readlines(self, hint=None): return self._do_read(self.rfile.readlines, hint) def __iter__(self): return iter(self.read()) MAX_REQUEST_LINE = 8192 MINIMUM_CHUNK_SIZE = 4096 class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler): protocol_version = 'HTTP/1.1' minimum_chunk_size = MINIMUM_CHUNK_SIZE def handle_one_request(self): if self.server.max_http_version: self.protocol_version = self.server.max_http_version if self.rfile.closed: self.close_connection = 1 return try: self.raw_requestline = self.rfile.readline(MAX_REQUEST_LINE) if len(self.raw_requestline) == MAX_REQUEST_LINE: self.wfile.write( "HTTP/1.0 414 Request URI Too Long\r\nConnection: close\r\nContent-length: 0\r\n\r\n") self.close_connection = 1 return except socket.error as e: if e[0] != errno.EBADF: raise self.raw_requestline = '' if not self.raw_requestline: self.close_connection = 1 return if not self.parse_request(): return self.environ = self.get_environ() self.application = self.server.app try: self.server.outstanding_requests += 1 try: self.handle_one_response() except socket.error as e: # Broken pipe, connection reset by peer if e[0] in (32, 54): pass else: raise finally: self.server.outstanding_requests -= 1 def handle_one_response(self): start = time.time() headers_set = [] headers_sent = [] # set of lowercase header names that were sent header_dict = {} wfile = self.wfile num_blocks = None result = None use_chunked = False length = [0] status_code = [200] def write(data, _writelines=wfile.writelines): towrite = [] if not headers_set: raise AssertionError("write() before start_response()") elif not headers_sent: status, response_headers = headers_set headers_sent.append(1) for k, v in response_headers: header_dict[k.lower()] = k towrite.append('%s %s\r\n' % (self.protocol_version, status)) for header in response_headers: towrite.append('%s: %s\r\n' % header) # send Date header? if 'date' not in header_dict: towrite.append('Date: %s\r\n' % (format_date_time(time.time()),)) if num_blocks is not None: if 'content-length' not in header_dict: towrite.append('Content-Length: %s\r\n' % (len(''.join(result)),)) elif use_chunked: towrite.append('Transfer-Encoding: chunked\r\n') else: towrite.append('Connection: close\r\n') self.close_connection = 1 towrite.append('\r\n') if use_chunked: ## Write the chunked encoding towrite.append("%x\r\n%s\r\n" % (len(data), data)) else: towrite.append(data) try: _writelines(towrite) length[0] = length[0] + sum(map(len, towrite)) except UnicodeEncodeError: print("Encountered unicode while attempting to write wsgi response: ", [x for x in towrite if isinstance(x, str)]) traceback.print_exc() _writelines( ["HTTP/1.0 500 Internal Server Error\r\n", "Connection: close\r\n", "Content-type: text/plain\r\n", "Content-length: 98\r\n", "\r\n", "Internal Server Error: wsgi application passed a unicode object to the server instead of a string."]) def start_response(status, response_headers, exc_info=None): status_code[0] = status.split()[0] if exc_info: try: if headers_sent: # Re-raise original exception if headers sent raise exc_info[0](exc_info[1]).with_traceback(exc_info[2]) finally: # Avoid dangling circular ref exc_info = None capitalized_headers = [('-'.join([x.capitalize() for x in key.split('-')]), value) for key, value in response_headers] headers_set[:] = [status, capitalized_headers] return write try: result = self.application(self.environ, start_response) except Exception as e: exc = ''.join(traceback.format_exception(*sys.exc_info())) print(exc) if not headers_set: start_response("500 Internal Server Error", [('Content-type', 'text/plain')]) write(exc) return try: num_blocks = len(result) except (TypeError, AttributeError, NotImplementedError): if self.request_version == 'HTTP/1.1': use_chunked = True try: try: towrite = [] try: for data in result: if data: towrite.append(data) if use_chunked and sum(map(len, towrite)) > self.minimum_chunk_size: write(''.join(towrite)) del towrite[:] except Exception as e: exc = traceback.format_exc() print(exc) if not headers_set: start_response("500 Internal Server Error", [('Content-type', 'text/plain')]) write(exc) return if towrite: write(''.join(towrite)) if not headers_sent: write('') if use_chunked: wfile.write('0\r\n\r\n') except Exception as e: traceback.print_exc() finally: if hasattr(result, 'close'): result.close() if self.environ['eventlib.input'].position < self.environ.get('CONTENT_LENGTH', 0): ## Read and discard body self.environ['eventlib.input'].read() finish = time.time() self.server.log_message('%s - - [%s] "%s" %s %s %.6f\n' % ( self.client_address[0], self.log_date_time_string(), self.requestline, status_code[0], length[0], finish - start)) def get_environ(self): env = self.server.get_environ() env['REQUEST_METHOD'] = self.command env['SCRIPT_NAME'] = '' if '?' in self.path: path, query = self.path.split('?', 1) else: path, query = self.path, '' - env['PATH_INFO'] = urllib.parse.unquote(path) + env['PATH_INFO'] = urllib_parse.unquote(path) env['QUERY_STRING'] = query - if self.headers.typeheader is None: - env['CONTENT_TYPE'] = self.headers.type + if self.headers.get('content-type') is None: + env['CONTENT_TYPE'] = self.headers.get_content_type() else: - env['CONTENT_TYPE'] = self.headers.typeheader - - length = self.headers.getheader('content-length') + env['CONTENT_TYPE'] = self.headers['content-type'] + length = self.headers.get('content-length') if length: env['CONTENT_LENGTH'] = length + env['SERVER_PROTOCOL'] = 'HTTP/1.0' host, port = self.request.getsockname() env['SERVER_NAME'] = host env['SERVER_PORT'] = str(port) env['REMOTE_ADDR'] = self.client_address[0] env['GATEWAY_INTERFACE'] = 'CGI/1.1' - for h in self.headers.headers: - k, v = h.split(':', 1) + for k in self.headers: + v = self.headers.get(k) k = k.replace('-', '_').upper() v = v.strip() if k in env: continue envk = 'HTTP_' + k if envk in env: env[envk] += ',' + v else: env[envk] = v if env.get('HTTP_EXPECT') == '100-continue': wfile = self.wfile wfile_line = 'HTTP/1.1 100 Continue\r\n\r\n' else: wfile = None wfile_line = None env['wsgi.input'] = env['eventlib.input'] = Input( self.rfile, length, wfile=wfile, wfile_line=wfile_line) return env def finish(self): BaseHTTPServer.BaseHTTPRequestHandler.finish(self) self.connection.close() class Server(BaseHTTPServer.HTTPServer): def __init__(self, socket, address, app, log=None, environ=None, max_http_version=None, protocol=HttpProtocol, minimum_chunk_size=None): self.outstanding_requests = 0 self.socket = socket self.address = address if log: self.log = log else: self.log = sys.stderr self.app = app self.environ = environ self.max_http_version = max_http_version self.protocol = protocol self.pid = os.getpid() if minimum_chunk_size is not None: protocol.minimum_chunk_size = minimum_chunk_size def get_environ(self): socket = self.socket d = { 'wsgi.errors': sys.stderr, 'wsgi.version': (1, 0), 'wsgi.multithread': True, 'wsgi.multiprocess': False, 'wsgi.run_once': False, 'wsgi.url_scheme': 'http', } if self.environ is not None: d.update(self.environ) return d - def process_request(self, xxx_todo_changeme): - (socket, address) = xxx_todo_changeme + def process_request(self, req): + (socket, address) = req proto = self.protocol(socket, address, self) proto.handle() def log_message(self, message): self.log.write(message + '\n') def server(sock, site, log=None, environ=None, max_size=None, max_http_version=DEFAULT_MAX_HTTP_VERSION, protocol=HttpProtocol, server_event=None, minimum_chunk_size=None): serv = Server(sock, sock.getsockname(), site, log, environ=None, max_http_version=max_http_version, protocol=protocol, minimum_chunk_size=minimum_chunk_size) if server_event is not None: server_event.send(serv) if max_size is None: max_size = DEFAULT_MAX_SIMULTANEOUS_REQUESTS pool = Pool(max_size=max_size) try: host, port = sock.getsockname() port = ':%s' % (port, ) if sock.is_secure: scheme = 'https' if port == ':443': port = '' else: scheme = 'http' if port == ':80': port = '' print("(%s) wsgi starting up on %s://%s%s/" % (os.getpid(), scheme, host, port)) while True: try: try: client_socket = sock.accept() except socket.error as e: if e[0] != errno.EPIPE and e[0] != errno.EBADF: raise pool.execute_async(serv.process_request, client_socket) except KeyboardInterrupt: api.get_hub().remove_descriptor(sock.fileno()) print("wsgi exiting") break finally: try: sock.close() except socket.error as e: if e[0] != errno.EPIPE: raise diff --git a/examples/connect.py b/examples/connect.py index b53bc98..abe4a69 100644 --- a/examples/connect.py +++ b/examples/connect.py @@ -1,52 +1,54 @@ # Copyright (c) 2008-2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. """Spawn multiple workers and collect their results. Demonstrates how to use eventlib.green package and proc module. """ from eventlib import proc from eventlib.green import socket # this example works with both standard eventlib hubs and with twisted-based hub # uncomment the following line to use twisted hub #from twisted.internet import reactor def geturl(url): - c = socket.socket() + sock = socket + c = sock.socket() ip = socket.gethostbyname(url) - c.connect((ip, 80)) - print('%s connected' % url) - c.send('GET /\r\n\r\n') - return c.recv(1024) + #conn = c.connect((ip, 80)) + #print('%s connected %s' % (url, ip)) + #sent = c.send('GET /\r\n\r\n') + return "#" #c.recv(1024) -urls = ['www.google.com', 'www.yandex.ru', 'www.python.org', 'ag-projects.com', 'sylkserver.com'] +urls = ['www.google.com', 'www.python.org', 'www.yandex.ru'] #, 'ag-projects.com'] #, 'sylkserver.com'] jobs = [proc.spawn(geturl, x) for x in urls] print('spawned %s jobs' % len(jobs)) # collect the results from workers +print("WAITALL", jobs) results = proc.waitall(jobs) # Note, that any exception in the workers will be reraised by waitall # unless trap_errors argument specifies otherwise for url, result in zip(urls, results): print('%s: %s' % (url, repr(result)[:50])) diff --git a/examples/echoserver.py b/examples/echoserver.py index 7aea415..4daa6c4 100755 --- a/examples/echoserver.py +++ b/examples/echoserver.py @@ -1,55 +1,55 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 """\ @file echoserver.py Simple server that listens on port 6000 and echos back every input to the client. To try out the server, start it up by running this file. Connect to it with: telnet localhost 6000 You terminate your connection by terminating telnet (typically Ctrl-] and then 'quit') Copyright (c) 2007, Linden Research, Inc. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from eventlib import api def handle_socket(reader, writer): - print("client connected") + print ("client connected") while True: # pass through every non-eof line x = reader.readline() if not x: break writer.write(x) - print("echoed", x) - print("client disconnected") + print(("echoed", x)) + print ("client disconnected") -print("server socket listening on port 6000") +print ("server socket listening on port 6000") server = api.tcp_listener(('0.0.0.0', 6000)) while True: try: new_sock, address = server.accept() except KeyboardInterrupt: break # handle every new connection with a new coroutine api.spawn(handle_socket, new_sock.makefile('r'), new_sock.makefile('w')) diff --git a/examples/twisted_client.py b/examples/twisted_client.py index 39544c4..e0c3886 100644 --- a/examples/twisted_client.py +++ b/examples/twisted_client.py @@ -1,26 +1,30 @@ """Example for GreenTransport and GreenClientCreator. In this example reactor is started implicitly upon the first use of a blocking function. """ from twisted.internet import ssl from twisted.internet.error import ConnectionClosed from eventlib.twistedutil.protocol import GreenClientCreator from eventlib.twistedutil.protocols.basic import LineOnlyReceiverTransport from twisted.internet import reactor +print("\n\nRead from TCP connection\n\n") + # read from TCP connection conn = GreenClientCreator(reactor).connectTCP('www.google.com', 80) -conn.write('GET / HTTP/1.0\r\n\r\n') +conn.write('GET /not_found HTTP/1.0\r\n\r\n') conn.loseWriteConnection() -print(conn.read()) +print(conn.read().decode('utf-8')) + +print("\n\nRead from SSL connection line by line\n\n") # read from SSL connection line by line -conn = GreenClientCreator(reactor, LineOnlyReceiverTransport).connectSSL('sf.net', 443, ssl.ClientContextFactory()) -conn.write('GET / HTTP/1.0\r\n\r\n') +conn = GreenClientCreator(reactor, LineOnlyReceiverTransport).connectSSL('ssltest.com', 443, ssl.ClientContextFactory()) +conn.write('GET /not_found HTTP/1.0\r\n\r\n') try: for num, line in enumerate(conn): print('%3s %r' % (num, line)) except ConnectionClosed as ex: print(ex) diff --git a/examples/twisted_portforward.py b/examples/twisted_portforward.py index a2023e1..3cfde3e 100755 --- a/examples/twisted_portforward.py +++ b/examples/twisted_portforward.py @@ -1,59 +1,59 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (c) 2008-2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. """Port forwarder USAGE: twisted_portforward.py local_port remote_host remote_port""" import sys from twisted.internet import reactor from eventlib.twistedutil import join_reactor from eventlib.twistedutil.protocol import GreenClientCreator, SpawnFactory, UnbufferedTransport from eventlib import proc def forward(source, dest): try: while True: x = source.recv() if not x: break print('forwarding %s bytes' % len(x)) dest.write(x) finally: dest.loseConnection() def handler(local): client = str(local.getHost()) print('accepted connection from %s' % client) remote = GreenClientCreator(reactor, UnbufferedTransport).connectTCP(remote_host, remote_port) a = proc.spawn(forward, remote, local) b = proc.spawn(forward, local, remote) proc.waitall([a, b], trap_errors=True) print('closed connection to %s' % client) try: local_port, remote_host, remote_port = sys.argv[1:] except ValueError: sys.exit(__doc__) local_port = int(local_port) remote_port = int(remote_port) reactor.listenTCP(local_port, SpawnFactory(handler)) reactor.run() diff --git a/greentest/api_test.py b/greentest/api_test.py index 0e990ee..73e70d9 100644 --- a/greentest/api_test.py +++ b/greentest/api_test.py @@ -1,228 +1,228 @@ # @author Donovan Preston # # Copyright (c) 2006-2007, Linden Research, Inc. # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import os import os.path from unittest import TestCase, main from eventlib import api from eventlib import greenio from eventlib import util def check_hub(): # Clear through the descriptor queue api.sleep(0) api.sleep(0) hub = api.get_hub() for nm in 'get_readers', 'get_writers', 'get_excs': dct = getattr(hub, nm)() assert not dct, "hub.%s not empty: %s" % (nm, dct) # Stop the runloop (unless it's twistedhub which does not support that) if not getattr(api.get_hub(), 'uses_twisted_reactor', None): api.get_hub().abort() api.sleep(0) ### ??? assert not api.get_hub().running class TestApi(TestCase): mode = 'static' certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt') private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key') def test_tcp_listener(self): socket = api.tcp_listener(('0.0.0.0', 0)) assert socket.getsockname()[0] == '0.0.0.0' socket.close() check_hub() def test_connect_tcp(self): def accept_once(listenfd): try: conn, addr = listenfd.accept() fd = conn.makeGreenFile() conn.close() fd.write('hello\n') fd.close() finally: listenfd.close() server = api.tcp_listener(('0.0.0.0', 0)) api.spawn(accept_once, server) client = api.connect_tcp(('127.0.0.1', server.getsockname()[1])) fd = client.makeGreenFile() client.close() - assert fd.readline() == 'hello\n' - + assert fd.readline() == b'hello\n' assert fd.read() == '' fd.close() check_hub() def test_connect_ssl(self): def accept_once(listenfd): try: + listenfd.do_handshake_on_connect=False conn, addr = listenfd.accept() fl = conn.makeGreenFile('w') fl.write('hello\r\n') fl.close() conn.close() finally: listenfd.close() server = api.ssl_listener(('0.0.0.0', 0), self.certificate_file, self.private_key_file) api.spawn(accept_once, server) client = util.wrap_ssl( - api.connect_tcp(('127.0.0.1', server.getsockname()[1]))) - client = client.makeGreenFile() + api.connect_tcp(('127.0.0.1', server.getsockname()[1])), self.certificate_file, self.private_key_file ) + #client = client.makeGreenFile() - assert client.readline() == 'hello\r\n' + #assert client.readline() == b'hello\r\n' assert client.read() == '' client.close() def test_server(self): connected = [] server = api.tcp_listener(('0.0.0.0', 0)) bound_port = server.getsockname()[1] - def accept_twice(xxx_todo_changeme): - (conn, addr) = xxx_todo_changeme + def accept_twice(client): + (conn, addr) = client connected.append(True) conn.close() if len(connected) == 2: server.close() api.call_after(0, api.connect_tcp, ('127.0.0.1', bound_port)) api.call_after(0, api.connect_tcp, ('127.0.0.1', bound_port)) try: api.tcp_server(server, accept_twice) except: api.sleep(0.1) raise assert len(connected) == 2 check_hub() def test_001_trampoline_timeout(self): server = api.tcp_listener(('0.0.0.0', 0)) bound_port = server.getsockname()[1] try: desc = greenio.GreenSocket(util.tcp_socket()) desc.connect(('127.0.0.1', bound_port)) api.trampoline(desc, read=True, write=False, timeout=0.1) except api.TimeoutError: pass # test passed else: assert False, "Didn't timeout" check_hub() def test_timeout_cancel(self): server = api.tcp_listener(('0.0.0.0', 0)) bound_port = server.getsockname()[1] - def client_connected(xxx_todo_changeme1): - (conn, addr) = xxx_todo_changeme1 + def client_connected(server): + (conn, addr) = server conn.close() def go(): client = util.tcp_socket() desc = greenio.GreenSocket(client) desc.connect(('127.0.0.1', bound_port)) try: api.trampoline(desc, read=True, write=True, timeout=0.1) except api.TimeoutError: assert False, "Timed out" server.close() client.close() api.call_after(0, go) api.tcp_server(server, client_connected) check_hub() if not getattr(api.get_hub(), 'uses_twisted_reactor', None): def test_explicit_hub(self): oldhub = api.get_hub() try: api.use_hub(Foo) assert isinstance(api.get_hub(), Foo), api.get_hub() finally: api._threadlocal.hub = oldhub check_hub() def test_named(self): named_foo = api.named('api_test.Foo') self.assertEqual( named_foo.__name__, "Foo") def test_naming_missing_class(self): self.assertRaises( ImportError, api.named, 'this_name_should_hopefully_not_exist.Foo') def test_timeout_and_final_write(self): # This test verifies that a write on a socket that we've # stopped listening for doesn't result in an incorrect switch rpipe, wpipe = os.pipe() - rfile = os.fdopen(rpipe,"r",0) + rfile = os.fdopen(rpipe,"rb",1) wrap_rfile = greenio.GreenPipe(rfile) - wfile = os.fdopen(wpipe,"w",0) + wfile = os.fdopen(wpipe,"wb",1) wrap_wfile = greenio.GreenPipe(wfile) def sender(evt): api.sleep(0.02) wrap_wfile.write('hi') evt.send('sent via event') from eventlib import coros evt = coros.event() api.spawn(sender, evt) try: # try and get some data off of this pipe # but bail before any is sent api.exc_after(0.01, api.TimeoutError) _c = wrap_rfile.read(1) self.fail() except api.TimeoutError: pass result = evt.wait() self.assertEqual(result, 'sent via event') class Foo(object): pass if __name__ == '__main__': main() diff --git a/greentest/coros_test.py b/greentest/coros_test.py index 9ae7a9e..49960a1 100644 --- a/greentest/coros_test.py +++ b/greentest/coros_test.py @@ -1,222 +1,222 @@ # @author Donovan Preston, Ryan Williams # # Copyright (c) 2000-2007, Linden Research, Inc. # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. from unittest import TestCase, main from eventlib import coros, api class TestEvent(TestCase): mode = 'static' def setUp(self): # raise an exception if we're waiting forever self._cancel_timeout = api.exc_after(1, RuntimeError('test takes too long')) def tearDown(self): self._cancel_timeout.cancel() def test_waiting_for_event(self): evt = coros.event() value = 'some stuff' def send_to_event(): evt.send(value) api.spawn(send_to_event) self.assertEqual(evt.wait(), value) def test_multiple_waiters(self): evt = coros.event() value = 'some stuff' results = [] def wait_on_event(i_am_done): evt.wait() results.append(True) i_am_done.send() waiters = [] count = 5 for i in range(count): waiters.append(coros.event()) api.spawn(wait_on_event, waiters[-1]) evt.send() for w in waiters: w.wait() self.assertEqual(len(results), count) def test_reset(self): evt = coros.event() # calling reset before send should throw self.assertRaises(AssertionError, evt.reset) value = 'some stuff' def send_to_event(): evt.send(value) api.spawn(send_to_event) self.assertEqual(evt.wait(), value) # now try it again, and we should get the same exact value, # and we shouldn't be allowed to resend without resetting value2 = 'second stuff' self.assertRaises(AssertionError, evt.send, value2) self.assertEqual(evt.wait(), value) # reset and everything should be happy evt.reset() def send_to_event2(): evt.send(value2) api.spawn(send_to_event2) self.assertEqual(evt.wait(), value2) def test_double_exception(self): evt = coros.event() # send an exception through the event evt.send(exc=RuntimeError('from test_double_exception')) self.assertRaises(RuntimeError, evt.wait) evt.reset() # shouldn't see the RuntimeError again api.exc_after(0.001, api.TimeoutError('from test_double_exception')) self.assertRaises(api.TimeoutError, evt.wait) class IncrActor(coros.Actor): def received(self, evt): self.value = getattr(self, 'value', 0) + 1 if evt: evt.send() class TestActor(TestCase): mode = 'static' def setUp(self): # raise an exception if we're waiting forever self._cancel_timeout = api.exc_after(1, api.TimeoutError()) self.actor = IncrActor() def tearDown(self): self._cancel_timeout.cancel() api.kill(self.actor._killer) def test_cast(self): evt = coros.event() self.actor.cast(evt) evt.wait() evt.reset() self.assertEqual(self.actor.value, 1) self.actor.cast(evt) evt.wait() self.assertEqual(self.actor.value, 2) def test_cast_multi_1(self): # make sure that both messages make it in there evt = coros.event() evt1 = coros.event() self.actor.cast(evt) self.actor.cast(evt1) evt.wait() evt1.wait() self.assertEqual(self.actor.value, 2) def test_cast_multi_2(self): # the actor goes through a slightly different code path if it # is forced to enter its event loop prior to any cast()s api.sleep(0) self.test_cast_multi_1() def test_sleeping_during_received(self): # ensure that even if the received method cooperatively # yields, eventually all messages are delivered msgs = [] waiters = [] - def received(xxx_todo_changeme ): - (message, evt) = xxx_todo_changeme + def received(rcvd): + (message, evt) = rcvd api.sleep(0) msgs.append(message) evt.send() self.actor.received = received waiters.append(coros.event()) self.actor.cast( (1, waiters[-1])) api.sleep(0) waiters.append(coros.event()) self.actor.cast( (2, waiters[-1]) ) waiters.append(coros.event()) self.actor.cast( (3, waiters[-1]) ) api.sleep(0) waiters.append(coros.event()) self.actor.cast( (4, waiters[-1]) ) waiters.append(coros.event()) self.actor.cast( (5, waiters[-1]) ) for evt in waiters: evt.wait() self.assertEqual(msgs, [1,2,3,4,5]) def test_raising_received(self): msgs = [] - def received(xxx_todo_changeme1 ): - (message, evt) = xxx_todo_changeme1 + def received(rcvd): + (message, evt) = rcvd evt.send() if message == 'fail': raise RuntimeError() else: msgs.append(message) self.actor.received = received evt = coros.event() self.actor.cast( ('fail', evt) ) evt.wait() evt.reset() self.actor.cast( ('should_appear', evt) ) evt.wait() self.assertEqual(['should_appear'], msgs) def test_multiple(self): self.actor = IncrActor(concurrency=2) total = [0] - def received(xxx_todo_changeme2 ): - (func, ev, value) = xxx_todo_changeme2 + def received(rcvd): + (func, ev, value) = rcvd func() total[0] += value ev.send() self.actor.received = received def onemoment(): api.sleep(0.1) evt = coros.event() evt1 = coros.event() self.actor.cast( (onemoment, evt, 1) ) self.actor.cast( (lambda: None, evt1, 2) ) evt1.wait() self.assertEqual(total[0], 2) # both coroutines should have been used self.assertEqual(self.actor._pool.current_size, 2) api.sleep(0) self.assertEqual(self.actor._pool.free(), 1) evt.wait() self.assertEqual(total[0], 3) api.sleep(0) self.assertEqual(self.actor._pool.free(), 2) if __name__ == '__main__': main() diff --git a/greentest/db_pool_test.py b/greentest/db_pool_test.py index ff6382b..51d3968 100755 --- a/greentest/db_pool_test.py +++ b/greentest/db_pool_test.py @@ -1,534 +1,534 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # @file test_mysql_pool.py # @brief Test cases for mysql_pool # # Copyright (c) 2007, Linden Research, Inc. # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. from unittest import TestCase, main from eventlib import api, coros from eventlib import db_pool class DBTester(object): def setUp(self): self.create_db() self.connection = None connection = self._dbmodule.connect(**self._auth) cursor = connection.cursor() cursor.execute("""CREATE TABLE gargleblatz ( a INTEGER ) ENGINE = InnoDB;""") connection.commit() cursor.close() def tearDown(self): if self.connection: self.connection.close() self.drop_db() def set_up_test_table(self, connection = None): close_connection = False if connection is None: close_connection = True if self.connection is None: connection = self._dbmodule.connect(**self._auth) else: connection = self.connection cursor = connection.cursor() cursor.execute("""CREATE TEMPORARY TABLE test_table ( row_id INTEGER PRIMARY KEY AUTO_INCREMENT, value_int INTEGER, value_float FLOAT, value_string VARCHAR(200), value_uuid CHAR(36), value_binary BLOB, value_binary_string VARCHAR(200) BINARY, value_enum ENUM('Y','N'), created TIMESTAMP ) ENGINE = InnoDB;""") connection.commit() cursor.close() if close_connection: connection.close() # silly mock class class Mock(object): pass class TestDBConnectionPool(DBTester): def setUp(self): super(TestDBConnectionPool, self).setUp() self.pool = self.create_pool() self.connection = self.pool.get() def tearDown(self): if self.connection: self.pool.put(self.connection) super(TestDBConnectionPool, self).tearDown() def assert_cursor_works(self, cursor): # TODO: this is pretty mysql-specific cursor.execute("show full processlist") rows = cursor.fetchall() self.assertTrue(rows) def test_connecting(self): self.assertTrue(self.connection is not None) def test_create_cursor(self): cursor = self.connection.cursor() cursor.close() def test_run_query(self): cursor = self.connection.cursor() self.assert_cursor_works(cursor) cursor.close() def test_run_bad_query(self): cursor = self.connection.cursor() try: cursor.execute("garbage blah blah") self.assertTrue(False) except AssertionError: raise except Exception as e: pass cursor.close() def test_put_none(self): # the pool is of size 1, and its only connection is out self.assertTrue(self.pool.free() == 0) self.pool.put(None) # ha ha we fooled it into thinking that we had a dead process self.assertTrue(self.pool.free() == 1) conn2 = self.pool.get() self.assertTrue(conn2 is not None) self.assertTrue(conn2.cursor) del conn2 def test_close_does_a_put(self): self.assertTrue(self.pool.free() == 0) self.connection.close() self.assertTrue(self.pool.free() == 1) self.assertRaises(AttributeError, self.connection.cursor) def test_deletion_does_a_put(self): self.assertTrue(self.pool.free() == 0) self.connection = None self.assertTrue(self.pool.free() == 1) def test_put_doesnt_double_wrap(self): self.pool.put(self.connection) conn = self.pool.get() self.assertTrue(not isinstance(conn._base, db_pool.PooledConnectionWrapper)) def test_bool(self): self.assertTrue(self.connection) self.connection.close() self.assertTrue(not self.connection) def fill_test_table(self, conn): curs = conn.cursor() for i in range(1000): curs.execute('insert into test_table (value_int) values (%s)' % i) conn.commit() def test_returns_immediately(self): self.pool = self.create_pool() conn = self.pool.get() self.set_up_test_table(conn) self.fill_test_table(conn) curs = conn.cursor() results = [] SHORT_QUERY = "select * from test_table" evt = coros.event() def a_query(): self.assert_cursor_works(curs) curs.execute(SHORT_QUERY) results.append(2) evt.send() evt2 = coros.event() api.spawn(a_query) results.append(1) self.assertEqual([1], results) evt.wait() self.assertEqual([1, 2], results) def test_connection_is_clean_after_put(self): self.pool = self.create_pool() conn = self.pool.get() self.set_up_test_table(conn) curs = conn.cursor() for i in range(10): curs.execute('insert into test_table (value_int) values (%s)' % i) # do not commit :-) self.pool.put(conn) del conn conn2 = self.pool.get() curs2 = conn2.cursor() for i in range(10): curs2.execute('insert into test_table (value_int) values (%s)' % i) conn2.commit() rows = curs2.execute("select * from test_table") # we should have only inserted them once self.assertEqual(10, rows) def test_visibility_from_other_connections(self): self.pool = self.create_pool(3) conn = self.pool.get() conn2 = self.pool.get() curs = conn.cursor() try: curs2 = conn2.cursor() rows2 = curs2.execute("insert into gargleblatz (a) values (%s)" % (314159)) self.assertEqual(rows2, 1) conn2.commit() selection_query = "select * from gargleblatz" rows2 = curs2.execute(selection_query) self.assertEqual(rows2, 1) del curs2 del conn2 # create a new connection, it should see the addition conn3 = self.pool.get() curs3 = conn3.cursor() rows3 = curs3.execute(selection_query) self.assertEqual(rows3, 1) # now, does the already-open connection see it? rows = curs.execute(selection_query) self.assertEqual(rows, 1) finally: # clean up my litter curs.execute("delete from gargleblatz where a=314159") conn.commit() def test_two_simultaneous_connections(self): """ This test is timing-sensitive. """ self.pool = self.create_pool(2) conn = self.pool.get() self.set_up_test_table(conn) self.fill_test_table(conn) curs = conn.cursor() conn2 = self.pool.get() self.set_up_test_table(conn2) self.fill_test_table(conn2) curs2 = conn2.cursor() results = [] LONG_QUERY = "select * from test_table" SHORT_QUERY = "select * from test_table where row_id <= 20" evt = coros.event() def long_running_query(): self.assert_cursor_works(curs) curs.execute(LONG_QUERY) results.append(1) evt.send() evt2 = coros.event() def short_running_query(): self.assert_cursor_works(curs2) curs2.execute(SHORT_QUERY) results.append(2) evt2.send() api.spawn(long_running_query) api.spawn(short_running_query) evt.wait() evt2.wait() results.sort() self.assertEqual([1, 2], results) def test_clear(self): self.pool = self.create_pool() self.pool.put(self.connection) self.pool.clear() self.assertEqual(len(self.pool.free_items), 0) def test_unwrap_connection(self): self.assertTrue(isinstance(self.connection, db_pool.GenericConnectionWrapper)) conn = self.pool._unwrap_connection(self.connection) self.assertTrue(not isinstance(conn, db_pool.GenericConnectionWrapper)) self.assertEqual(None, self.pool._unwrap_connection(None)) self.assertEqual(None, self.pool._unwrap_connection(1)) # testing duck typing here -- as long as the connection has a # _base attribute, it should be unwrappable x = Mock() x._base = 'hi' self.assertEqual('hi', self.pool._unwrap_connection(x)) def test_safe_close(self): self.pool._safe_close(self.connection, quiet=True) self.assertEqual(len(self.pool.free_items), 1) self.pool._safe_close(None) self.pool._safe_close(1) # now we're really going for 100% coverage x = Mock() def fail(): raise KeyboardInterrupt() x.close = fail self.assertRaises(KeyboardInterrupt, self.pool._safe_close, x) x = Mock() def fail2(): raise RuntimeError("if this line has been printed, the test succeeded") x.close = fail2 self.pool._safe_close(x, quiet=False) def test_zero_max_idle(self): self.pool = self.create_pool(max_size=2, max_idle=0) self.connection = self.pool.get() self.connection.close() self.assertEqual(len(self.pool.free_items), 0) def test_zero_max_age(self): self.pool = self.create_pool(max_size=2, max_age=0) self.connection = self.pool.get() self.connection.close() self.assertEqual(len(self.pool.free_items), 0) def dont_test_max_idle(self): # This test is timing-sensitive. Rename the function without the "dont" to run it, but beware that it could fail or take a while. self.pool = self.create_pool(max_size=2, max_idle=0.02) self.connection = self.pool.get() self.connection.close() self.assertEqual(len(self.pool.free_items), 1) api.sleep(0.01) # not long enough to trigger the idle timeout self.assertEqual(len(self.pool.free_items), 1) self.connection = self.pool.get() self.connection.close() self.assertEqual(len(self.pool.free_items), 1) api.sleep(0.01) # idle timeout should have fired but done nothing self.assertEqual(len(self.pool.free_items), 1) self.connection = self.pool.get() self.connection.close() self.assertEqual(len(self.pool.free_items), 1) api.sleep(0.03) # long enough to trigger idle timeout for real self.assertEqual(len(self.pool.free_items), 0) def dont_test_max_idle_many(self): # This test is timing-sensitive. Rename the function without the "dont" to run it, but beware that it could fail or take a while. self.pool = self.create_pool(max_size=2, max_idle=0.02) self.connection, conn2 = self.pool.get(), self.pool.get() self.connection.close() api.sleep(0.01) self.assertEqual(len(self.pool.free_items), 1) conn2.close() self.assertEqual(len(self.pool.free_items), 2) api.sleep(0.02) # trigger cleanup of conn1 but not conn2 self.assertEqual(len(self.pool.free_items), 1) def dont_test_max_age(self): # This test is timing-sensitive. Rename the function without the "dont" to run it, but beware that it could fail or take a while. self.pool = self.create_pool(max_size=2, max_age=0.05) self.connection = self.pool.get() self.connection.close() self.assertEqual(len(self.pool.free_items), 1) api.sleep(0.01) # not long enough to trigger the age timeout self.assertEqual(len(self.pool.free_items), 1) self.connection = self.pool.get() self.connection.close() self.assertEqual(len(self.pool.free_items), 1) api.sleep(0.05) # long enough to trigger age timeout self.assertEqual(len(self.pool.free_items), 0) def dont_test_max_age_many(self): # This test is timing-sensitive. Rename the function without the "dont" to run it, but beware that it could fail or take a while. self.pool = self.create_pool(max_size=2, max_age=0.15) self.connection, conn2 = self.pool.get(), self.pool.get() self.connection.close() self.assertEqual(len(self.pool.free_items), 1) api.sleep(0) # not long enough to trigger the age timeout self.assertEqual(len(self.pool.free_items), 1) api.sleep(0.2) # long enough to trigger age timeout self.assertEqual(len(self.pool.free_items), 0) conn2.close() # should not be added to the free items self.assertEqual(len(self.pool.free_items), 0) def test_connection_timeout(self): # use a nonexistent ip address -- this one is reserved by IANA self._auth['host'] = '192.0.2.1' pool = self.create_pool() self.assertRaises(db_pool.ConnectTimeout, pool.get) def test_waiters_get_woken(self): # verify that when there's someone waiting on an empty pool # and someone puts an immediately-closed connection back in # the pool that the waiter gets woken self.pool = self.create_pool(max_size=1, max_age=0) conn = self.pool.get() self.assertEqual(self.pool.free(), 0) self.assertEqual(self.pool.waiting(), 0) e = coros.event() def retrieve(pool, ev): c = pool.get() ev.send(c) api.spawn(retrieve, self.pool, e) api.sleep(0) # these two sleeps should advance the retrieve api.sleep(0) # coroutine until it's waiting in get() self.assertEqual(self.pool.free(), 0) self.assertEqual(self.pool.waiting(), 1) self.pool.put(conn) timer = api.exc_after(0.3, api.TimeoutError) conn = e.wait() timer.cancel() self.assertEqual(self.pool.free(), 0) self.assertEqual(self.pool.waiting(), 0) def dont_test_0_straight_benchmark(self): """ Benchmark; don't run unless you want to wait a while.""" import time iterations = 20000 c = self.connection.cursor() self.connection.commit() def bench(c): for i in range(iterations): c.execute('select 1') bench(c) # warm-up results = [] for i in range(3): start = time.time() bench(c) end = time.time() results.append(end-start) print("\n%u iterations took an average of %f seconds, (%s) in %s\n" % ( iterations, sum(results)/len(results), results, type(self))) def test_raising_create(self): # if the create() method raises an exception the pool should # not lose any connections self.pool = self.create_pool(max_size=1, module=RaisingDBModule()) self.assertRaises(RuntimeError, self.pool.get) self.assertEqual(self.pool.free(), 1) class RaisingDBModule(object): def connect(self, *args, **kw): raise RuntimeError() class TestTpoolConnectionPool(TestDBConnectionPool): def create_pool(self, max_size = 1, max_idle = 10, max_age = 10, connect_timeout=0.5, module=None): if module is None: module = self._dbmodule return db_pool.TpooledConnectionPool(module, min_size=0, max_size=max_size, max_idle=max_idle, max_age=max_age, connect_timeout = connect_timeout, **self._auth) class TestSaranwrapConnectionPool(TestDBConnectionPool): def create_pool(self, max_size = 1, max_idle = 10, max_age = 10, connect_timeout= 0.5, module=None): if module is None: module = self._dbmodule return db_pool.SaranwrappedConnectionPool(module, min_size=0, max_size=max_size, max_idle=max_idle, max_age=max_age, connect_timeout=connect_timeout, **self._auth) def test_raising_create(self): # *TODO: this fails because of saranwrap's unwillingness to # wrap objects in tests, but it should be fixable pass class TestRawConnectionPool(TestDBConnectionPool): def create_pool(self, max_size = 1, max_idle = 10, max_age = 10, connect_timeout= 0.5, module=None): if module is None: module = self._dbmodule return db_pool.RawConnectionPool(module, min_size=0, max_size=max_size, max_idle=max_idle, max_age=max_age, connect_timeout=connect_timeout, **self._auth) def test_connection_timeout(self): pass # not gonna work for raw connections because they're not nonblocking class TestMysqlConnectionPool(object): def setUp(self): import MySQLdb self._dbmodule = MySQLdb try: import simplejson import os.path auth_utf8 = simplejson.load(open(os.path.join(os.path.dirname(__file__), 'auth.json'))) # have to convert unicode objects to str objects because mysqldb is dum self._auth = dict([(str(k), str(v)) for k, v in list(auth_utf8.items())]) except (IOError, ImportError) as e: self._auth = {'host': 'localhost','user': 'root','passwd': '','db': 'persist0'} super(TestMysqlConnectionPool, self).setUp() def tearDown(self): pass def create_db(self): auth = self._auth.copy() try: self.drop_db() except Exception: pass dbname = auth.pop('db') db = self._dbmodule.connect(**auth).cursor() db.execute("create database "+dbname) db.close() del db def drop_db(self): db = self._dbmodule.connect(**self._auth).cursor() db.execute("drop database "+self._auth['db']) db.close() del db # for some reason the tpool test hangs if run after the saranwrap test class Test01MysqlTpool(TestMysqlConnectionPool, TestTpoolConnectionPool, TestCase): pass class Test02MysqlSaranwrap(TestMysqlConnectionPool, TestSaranwrapConnectionPool, TestCase): pass class Test03MysqlRaw(TestMysqlConnectionPool, TestRawConnectionPool, TestCase): pass if __name__ == '__main__': try: import MySQLdb except ImportError: print("Unable to import MySQLdb, skipping db_pool_test.") else: main() else: import MySQLdb diff --git a/greentest/generate_report.py b/greentest/generate_report.py index 1f7acf2..df0b7b0 100755 --- a/greentest/generate_report.py +++ b/greentest/generate_report.py @@ -1,235 +1,235 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (c) 2008-2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import sys import os import sqlite3 import glob REPO_URL = 'http://bitbucket.org/denis/eventlib' hubs_order = ['poll', 'selects', 'libevent', 'libev', 'twistedr/selectreactor', 'twistedr/pollreactor', 'twistedr/epollreactor'] def make_table(database): c = sqlite3.connect(database) res = c.execute(('select command_record.id, testname, hub, runs, errors, fails, ' 'timeouts, exitcode, stdout from parsed_command_record join ' 'command_record on parsed_command_record.id=command_record.id ')).fetchall() table = {} # testname -> hub -> test_result (runs, errors, fails, timeouts) tests = set() for id, testname, hub, runs, errors, fails, timeouts, exitcode, stdout in res: tests.add(testname) test_result = TestResult(runs, errors, fails, timeouts, exitcode, id, stdout) table.setdefault(testname, {})[hub] = test_result return table, sorted(tests) def calc_hub_stats(table): hub_stats = {} # hub -> cumulative test_result for testname in table: for hub in table[testname]: test_result = table[testname][hub] hub_stats.setdefault(hub, TestResult(0,0,0,0)).__iadd__(test_result) hubs = list(hub_stats.items()) hub_names = sorted(hub_stats.keys()) def get_order(hub): try: return hubs_order.index(hub) except ValueError: return 100 + hub_names.index(hub) hubs.sort(key=lambda hub_stats1: get_order(hub_stats1[0])) return hub_stats, [x[0] for x in hubs] class TestResult: def __init__(self, runs, errors, fails, timeouts, exitcode=None, id=None, output=None): self.runs = max(runs, 0) self.errors = max(errors, 0) self.fails = max(fails, 0) self.timeouts = max(timeouts, 0) self.exitcode = exitcode self.id = id self.output = output @property def passed(self): return max(0, self.runs - self.errors - self.fails) @property def failed(self): return self.errors + self.fails @property def total(self): return self.runs + self.timeouts @property def percentage(self): return float(self.passed) / self.total def __iadd__(self, other): self.runs += other.runs self.errors += other.errors self.fails += other.fails self.timeouts += other.timeouts if self.exitcode != other.exitcode: self.exitcode = None self.id = None self.output = None def color(self): if self.id is None: return 'white' if self.timeouts or self.exitcode in [7, 9, 10]: return 'red' elif self.errors or self.fails or self.exitcode: return 'yellow' else: return '"#72ff75"' def warnings(self): r = [] if not self.failed and not self.timeouts: if self.exitcode in [7, 9, 10]: r += ['TIMEOUT'] if self.exitcode: r += ['exitcode=%s' % self.exitcode] if self.output is not None: output = self.output.lower() warning = output.count('warning') if warning: r += ['%s warnings' % warning] tracebacks = output.count('traceback') if tracebacks: r += ['%s tracebacks' % tracebacks] return r def text(self): errors = [] if self.fails: errors += ['%s failed' % self.fails] if self.errors: errors += ['%s raised' % self.errors] if self.timeouts: errors += ['%s timeout' % self.timeouts] errors += self.warnings() if self.id is None: errors += ['
%s total' % self.total] return '\n'.join(["%s passed" % self.passed] + errors).replace(' ', ' ') # shorter passed/failed/raised/timeout def text_short(self): r = '%s/%s/%s' % (self.passed, self.failed, self.timeouts) if self.warnings(): r += '\n' + '\n'.join(self.warnings()).replace(' ', ' ') return r def format(self): text = self.text().replace('\n', '
\n') if self.id is None: valign = 'bottom' else: text = '%s' % (self.id, text) valign = 'center' return '%s' % (valign, self.color(), text) def format_testname(changeset, test): return '%s' % (REPO_URL, changeset, test, test) def format_table(table, hubs, tests, hub_stats, changeset): r = '\n\n\n' % hub r += '\n' r += '' for hub in hubs: test_result = hub_stats.get(hub) if test_result is None: r += '' else: r += test_result.format() + '\n' r += '' r += '' % (len(hubs)+1) for test in tests: r += '' % format_testname(changeset, test) for hub in hubs: test_result = table[test].get(hub) if test_result is None: r += '' else: r += test_result.format() + '\n' r += '' r += '
\n' for hub in hubs: r += '%s
Totalno data
%sno data
' return r def format_header(rev, changeset, pyversion): result = '
' if REPO_URL is None: result += 'Eventlet changeset %s: %s' % (rev, changeset) else: url = '%s/changeset/%s' % (REPO_URL, changeset) result += 'Eventlet changeset %s: %s' % (url, rev, changeset) result += '
Python version: %s

' % pyversion return result def format_html(table, rev, changeset, pyversion): r = '' r += format_header(rev, changeset, pyversion) r += table r += '' return r def generate_raw_results(path, database): c = sqlite3.connect(database) res = c.execute('select id, stdout from command_record').fetchall() for id, out in res: - file(os.path.join(path, '%s.txt' % id), 'w').write(out.encode('utf-8')) + open(os.path.join(path, '%s.txt' % id), 'w').write(out) sys.stderr.write('.') sys.stderr.write('\n') def main(db): full_changeset = '.'.join(db.split('.')[1:-1]) rev, changeset, pyversion = full_changeset.split('_') table, tests = make_table(db) hub_stats, hubs = calc_hub_stats(table) report = format_html(format_table(table, hubs, tests, hub_stats, changeset), rev, changeset, pyversion) path = '../htmlreports/%s' % full_changeset try: os.makedirs(path) except OSError as ex: if 'File exists' not in str(ex): raise - file(path + '/index.html', 'w').write(report) + open(path + '/index.html', 'w').write(report) generate_raw_results(path, db) if __name__=='__main__': if not sys.argv[1:]: latest_db = sorted(glob.glob('results.*.db'), key=lambda f: os.stat(f).st_mtime)[-1] print(latest_db) sys.argv.append(latest_db) for db in sys.argv[1:]: main(db) diff --git a/greentest/greenio_test.py b/greentest/greenio_test.py index 86fc508..1b07e0b 100644 --- a/greentest/greenio_test.py +++ b/greentest/greenio_test.py @@ -1,101 +1,101 @@ # Copyright (c) 2006-2007, Linden Research, Inc. # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. from unittest import TestCase, main from eventlib import api import socket # TODO try and reuse unit tests from within Python itself class TestGreenIo(TestCase): def test_close_with_makefile(self): def accept_close_early(listener): # verify that the makefile and the socket are truly independent # by closing the socket prior to using the made file try: conn, addr = listener.accept() fd = conn.makeGreenFile() conn.close() fd.write('hello\n') fd.close() self.assertRaises(socket.error, fd.write, 'a') self.assertRaises(socket.error, conn.send, 'b') finally: listener.close() def accept_close_late(listener): # verify that the makefile and the socket are truly independent # by closing the made file and then sending a character try: conn, addr = listener.accept() fd = conn.makeGreenFile() fd.write('hello') fd.close() conn.send('\n') conn.close() self.assertRaises(socket.error, fd.write, 'a') self.assertRaises(socket.error, conn.send, 'b') finally: listener.close() def did_it_work(server): client = api.connect_tcp(('127.0.0.1', server.getsockname()[1])) fd = client.makeGreenFile() client.close() - assert fd.readline() == 'hello\n' + assert fd.readline() == b'hello\n' assert fd.read() == '' fd.close() server = api.tcp_listener(('0.0.0.0', 0)) killer = api.spawn(accept_close_early, server) did_it_work(server) api.kill(killer) server = api.tcp_listener(('0.0.0.0', 0)) killer = api.spawn(accept_close_late, server) did_it_work(server) api.kill(killer) def test_del_closes_socket(self): timer = api.exc_after(0.5, api.TimeoutError) def accept_once(listener): # delete/overwrite the original conn # object, only keeping the file object around # closing the file object should close everything try: conn, addr = listener.accept() conn = conn.makeGreenFile() conn.write('hello\n') conn.close() self.assertRaises(socket.error, conn.write, 'a') finally: listener.close() server = api.tcp_listener(('0.0.0.0', 0)) killer = api.spawn(accept_once, server) client = api.connect_tcp(('127.0.0.1', server.getsockname()[1])) fd = client.makeGreenFile() client.close() - assert fd.read() == 'hello\n' + assert fd.read() == b'hello\n' assert fd.read() == '' timer.cancel() if __name__ == '__main__': main() diff --git a/greentest/parse_results.py b/greentest/parse_results.py index 267415c..dba43ea 100755 --- a/greentest/parse_results.py +++ b/greentest/parse_results.py @@ -1,128 +1,130 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (c) 2008-2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import sys import os import traceback import sqlite3 import re import glob def parse_stdout(s): + if isinstance(s, bytes): + s = s.decode('utf-8') argv = re.search('^===ARGV=(.*?)$', s, re.M).group(1) argv = argv.split() testname = argv[-1] del argv[-1] hub = None reactor = None while argv: if argv[0]=='--hub': hub = argv[1] del argv[0] del argv[0] elif argv[0]=='--reactor': reactor = argv[1] del argv[0] del argv[0] else: del argv[0] if reactor is not None: hub += '/%s' % reactor return testname, hub unittest_delim = '----------------------------------------------------------------------' def parse_unittest_output(s): s = s[s.rindex(unittest_delim)+len(unittest_delim):] num = int(re.search('^Ran (\d+) test.*?$', s, re.M).group(1)) ok = re.search('^OK$', s, re.M) error, fail, timeout = 0, 0, 0 failed_match = re.search(r'^FAILED \((?:failures=(?P\d+))?,? ?(?:errors=(?P\d+))?\)$', s, re.M) ok_match = re.search('^OK$', s, re.M) if failed_match: assert not ok_match, (ok_match, s) fail = failed_match.group('f') error = failed_match.group('e') fail = int(fail or '0') error = int(error or '0') else: assert ok_match, repr(s) timeout_match = re.search('^===disabled because of timeout: (\d+)$', s, re.M) if timeout_match: timeout = int(timeout_match.group(1)) return num, error, fail, timeout def main(db): c = sqlite3.connect(db) c.execute('''create table if not exists parsed_command_record (id integer not null unique, testname text, hub text, runs integer, errors integer, fails integer, timeouts integer, error_names text, fail_names text, timeout_names text)''') c.commit() parse_error = 0 SQL = ('select command_record.id, command, stdout, exitcode from command_record ' 'where not exists (select * from parsed_command_record where ' 'parsed_command_record.id=command_record.id)') for row in c.execute(SQL).fetchall(): id, command, stdout, exitcode = row try: testname, hub = parse_stdout(stdout) if unittest_delim in stdout: runs, errors, fails, timeouts = parse_unittest_output(stdout) else: if exitcode == 0: runs, errors, fails, timeouts = 1,0,0,0 if exitcode == 7: runs, errors, fails, timeouts = 0,0,0,1 elif exitcode: runs, errors, fails, timeouts = 1,1,0,0 except Exception: parse_error += 1 sys.stderr.write('Failed to parse id=%s\n' % id) print(repr(stdout)) traceback.print_exc() else: print(id, hub, testname, runs, errors, fails, timeouts) c.execute('insert into parsed_command_record ' '(id, testname, hub, runs, errors, fails, timeouts) ' 'values (?, ?, ?, ?, ?, ?, ?)', (id, testname, hub, runs, errors, fails, timeouts)) c.commit() if __name__=='__main__': if not sys.argv[1:]: latest_db = sorted(glob.glob('results.*.db'), key=lambda f: os.stat(f).st_mtime)[-1] print(latest_db) sys.argv.append(latest_db) for db in sys.argv[1:]: main(db) - exec(compile(open('generate_report.py', "rb").read(), 'generate_report.py', 'exec')) + exec(compile(open('generate_report.py').read(), 'generate_report.py', 'exec')) diff --git a/greentest/record_results.py b/greentest/record_results.py index 943d0d7..660cb70 100755 --- a/greentest/record_results.py +++ b/greentest/record_results.py @@ -1,81 +1,82 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (c) 2008-2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. """Run the program and record stdout/stderr/exitcode into the database results.rev_changeset.db Usage: %prog program [args] """ import sys import os import codecs +import tempfile try: import sqlite3 except ImportError: import pysqlite2.dbapi2 as sqlite3 import warnings from greentest import disabled_marker warnings.simplefilter('ignore') PYTHON_VERSION = '%s.%s.%s' % sys.version_info[:3] COMMAND_CHANGESET = r"hg log -r tip | grep changeset" def record(changeset, argv, stdout, returncode): c = sqlite3.connect('results.%s_%s.db' % (changeset, PYTHON_VERSION)) c.execute('''create table if not exists command_record (id integer primary key autoincrement, command text, stdout text, exitcode integer)''') c.execute('insert into command_record (command, stdout, exitcode)' 'values (?, ?, ?)', (repr(argv), stdout, returncode)) c.commit() def main(): argv = sys.argv[1:] if argv[0]=='-d': debug = True del argv[0] else: debug = False changeset = os.popen(COMMAND_CHANGESET).readlines()[0].replace('changeset:', '').strip().replace(':', '_') - output_name = os.tmpnam() - arg = ' '.join(argv) + ' &> %s' % output_name - print(arg) + output_fd = tempfile.NamedTemporaryFile(delete=False) + output_name = output_fd.name + arg = ' '.join(argv) + ' > %s' % output_name returncode = os.system(arg)>>8 print(arg, 'finished with code', returncode) stdout = codecs.open(output_name, mode='r', encoding='utf-8', errors='replace').read().replace('\x00', '?') if not debug: if returncode==1: pass elif returncode==8 and disabled_marker in stdout: pass else: record(changeset, argv, stdout, returncode) - os.unlink(output_name) + #os.unlink(output_name) sys.exit(returncode) if __name__=='__main__': main() diff --git a/greentest/runall.py b/greentest/runall.py index 2be86f0..d3e7743 100755 --- a/greentest/runall.py +++ b/greentest/runall.py @@ -1,172 +1,172 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (c) 2008-2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. """Run tests for different configurations (hub/reactor)""" import sys import os import random from glob import glob from optparse import OptionParser, Option from copy import copy from time import time -from .with_eventlet import import_reactor +from with_eventlet import import_reactor first_hubs = ['poll', 'selects', 'twistedr'] first_reactors = ['selectreactor', 'pollreactor'] COMMAND = sys.executable + ' ./record_results.py ' + sys.executable + ' ./with_timeout.py ./with_eventlet.py %(setup)s %(test)s' PARSE_PERIOD = 10 # the following aren't in the default list unless --all option present NOT_HUBS = set() NOT_REACTORS = set(['wxreactor', 'glib2reactor', 'gtk2reactor']) NOT_TESTS = set(['db_pool_test.py']) def w(s): sys.stderr.write("%s\n" % (s, )) def enum_hubs(): from eventlib.api import use_hub hubs = glob('../eventlib/hubs/*.py') hubs = [os.path.basename(h)[:-3] for h in hubs] hubs = [h for h in hubs if h[:1]!='_'] hubs = set(hubs) hubs.discard('hub') hubs -= NOT_HUBS result = [] for hub in hubs: try: use_hub(hub) except Exception as ex: print('Skipping hub %s: %s' % (hub, ex)) else: result.append(hub) return result def enum_reactors(): try: import twisted except ImportError: return [] p = os.path.join(os.path.dirname(twisted.__file__), 'internet', '*?reactor.py') files = glob(p) all_reactors = [os.path.basename(f[:-3]) for f in files] all_reactors = set(all_reactors) - NOT_REACTORS selected_reactors = [] for reactor in all_reactors: try: import_reactor(reactor) except Exception as ex: print('Skipping reactor %s: %s' % (reactor, ex)) else: selected_reactors.append(reactor) return selected_reactors def enum_tests(): tests = [] tests += glob('test_*.py') tests += glob('*_test.py') tests = set(tests) - NOT_TESTS - set(['test_support.py']) return tests def cmd(program): w(program) res = os.system(program)>>8 w(res) if res==1: sys.exit(1) return res def check_stringlist(option, opt, value): return value.split(',') class MyOption(Option): TYPES = Option.TYPES + ("stringlist",) TYPE_CHECKER = copy(Option.TYPE_CHECKER) TYPE_CHECKER["stringlist"] = check_stringlist def main(): global NOT_HUBS, NOT_REACTORS, NOT_TESTS parser = OptionParser(option_class=MyOption) parser.add_option('-u', '--hubs', type='stringlist') parser.add_option('-r', '--reactors', type='stringlist') parser.add_option('--ignore-hubs', type='stringlist', default=[]) parser.add_option('--ignore-reactors', type='stringlist', default=[]) parser.add_option('--ignore-tests', type='stringlist', default=[]) parser.add_option('-s', '--show', help='show default values and exit', action='store_true', default=False) parser.add_option('-a', '--all', action='store_true', default=False) options, args = parser.parse_args() options.tests = args or None if options.all: NOT_HUBS = NOT_REACTORS = NOT_TESTS = set() if options.hubs is None: options.hubs = enum_hubs() if options.reactors is None: options.reactors = enum_reactors() if options.tests is None: options.tests = enum_tests() tests = [] for t in options.tests: tests.extend(glob(t)) options.tests = tests options.hubs = list(set(options.hubs) - set(options.ignore_hubs)) options.reactors = list(set(options.reactors) - set(options.ignore_reactors)) options.tests = list(set(options.tests) - set(options.ignore_tests)) random.shuffle(options.hubs) options.hubs.sort(key=first_hubs.__contains__, reverse=True) random.shuffle(options.reactors) options.reactors.sort(key=first_reactors.__contains__, reverse=True) random.shuffle(options.tests) print('hubs: %s' % ','.join(options.hubs)) print('reactors: %s' % ','.join(options.reactors)) print('tests: %s' % ','.join(options.tests)) if options.show: return setups = [] for hub in options.hubs: if hub == 'twistedr': for reactor in options.reactors: setups.append('--hub twistedr --reactor %s' % reactor) else: setups.append('--hub %s' % hub) last_time = time() for setup in setups: w(setup) for test in options.tests: w(test) cmd(COMMAND % locals()) if time()-last_time>PARSE_PERIOD: os.system('./parse_results.py') last_time = time() os.system('./parse_results.py') if __name__=='__main__': main() diff --git a/greentest/saranwrap_test.py b/greentest/saranwrap_test.py index 35e3090..28f3938 100755 --- a/greentest/saranwrap_test.py +++ b/greentest/saranwrap_test.py @@ -1,363 +1,363 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # @file test_saranwrap.py # @brief Test cases for saranwrap. # # Copyright (c) 2007, Linden Research, Inc. # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. from eventlib import api, saranwrap from eventlib.pool import Pool import os import sys import tempfile import time import unittest import uuid # random test stuff def list_maker(): return [0,1,2] one = 1 two = 2 three = 3 class CoroutineCallingClass(object): def __init__(self): self._my_dict = {} def run_coroutine(self): api.spawn(self._add_random_key) def _add_random_key(self): self._my_dict['random'] = 'yes, random' def get_dict(self): return self._my_dict class TestSaranwrap(unittest.TestCase): def assert_server_exists(self, prox): self.assertTrue(saranwrap.status(prox)) prox.foo = 0 self.assertEqual(0, prox.foo) def test_wrap_tuple(self): my_tuple = (1, 2) prox = saranwrap.wrap(my_tuple) self.assertEqual(prox[0], 1) self.assertEqual(prox[1], 2) self.assertEqual(len(my_tuple), 2) def test_wrap_string(self): my_object = "whatever" prox = saranwrap.wrap(my_object) self.assertEqual(str(my_object), str(prox)) self.assertEqual(len(my_object), len(prox)) self.assertEqual(my_object.join(['a', 'b']), prox.join(['a', 'b'])) def test_wrap_uniterable(self): # here we're treating the exception as just a normal class prox = saranwrap.wrap(FloatingPointError()) def index(): prox[0] def key(): prox['a'] self.assertRaises(IndexError, index) self.assertRaises(TypeError, key) def test_wrap_dict(self): my_object = {'a':1} prox = saranwrap.wrap(my_object) self.assertEqual('a', list(prox.keys())[0]) self.assertEqual(1, prox['a']) self.assertEqual(str(my_object), str(prox)) self.assertEqual('saran:' + repr(my_object), repr(prox)) self.assertEqual('saran:' + repr(my_object), repr(prox)) def test_wrap_module_class(self): prox = saranwrap.wrap(uuid) self.assertEqual(saranwrap.Proxy, type(prox)) id = prox.uuid4() self.assertEqual(id.get_version(), uuid.uuid4().get_version()) self.assertTrue(repr(prox.uuid4)) def test_wrap_eq(self): prox = saranwrap.wrap(uuid) id1 = prox.uuid4() id2 = prox.UUID(str(id1)) self.assertEqual(id1, id2) id3 = prox.uuid4() self.assertTrue(id1 != id3) def test_wrap_nonzero(self): prox = saranwrap.wrap(uuid) id1 = prox.uuid4() self.assertTrue(bool(id1)) prox2 = saranwrap.wrap([1, 2, 3]) self.assertTrue(bool(prox2)) def test_multiple_wraps(self): prox1 = saranwrap.wrap(uuid) prox2 = saranwrap.wrap(uuid) x1 = prox1.uuid4() x2 = prox1.uuid4() del x2 x3 = prox2.uuid4() def test_dict_passthru(self): prox = saranwrap.wrap(uuid) x = prox.uuid4() self.assertEqual(type(x.__dict__), saranwrap.ObjectProxy) # try it all on one line just for the sake of it self.assertEqual(type(saranwrap.wrap(uuid).uuid4().__dict__), saranwrap.ObjectProxy) def test_is_value(self): server = saranwrap.Server(None, None, None) self.assertTrue(server.is_value(None)) def test_wrap_getitem(self): prox = saranwrap.wrap([0,1,2]) self.assertEqual(prox[0], 0) def test_wrap_setitem(self): prox = saranwrap.wrap([0,1,2]) prox[1] = 2 self.assertEqual(prox[1], 2) def test_raising_exceptions(self): prox = saranwrap.wrap(uuid) def nofunc(): prox.never_name_a_function_like_this() self.assertRaises(AttributeError, nofunc) def test_raising_weird_exceptions(self): # the recursion is killing me! prox = saranwrap.wrap(saranwrap) try: prox.raise_a_weird_error() self.assertTrue(False) except: import sys ex = sys.exc_info()[0] self.assertEqual(ex, "oh noes you can raise a string") self.assert_server_exists(prox) def test_unpicklable_server_exception(self): prox = saranwrap.wrap(saranwrap) def unpickle(): prox.raise_an_unpicklable_error() self.assertRaises(saranwrap.UnrecoverableError, unpickle) # It's basically dead #self.assert_server_exists(prox) def test_pickleable_server_exception(self): prox = saranwrap.wrap(saranwrap) def fperror(): prox.raise_standard_error() self.assertRaises(FloatingPointError, fperror) self.assert_server_exists(prox) def test_print_does_not_break_wrapper(self): prox = saranwrap.wrap(saranwrap) prox.print_string('hello') self.assert_server_exists(prox) def test_stderr_does_not_break_wrapper(self): prox = saranwrap.wrap(saranwrap) prox.err_string('goodbye') self.assert_server_exists(prox) def assertLessThan(self, a, b): self.assertTrue(a < b, "%s is not less than %s" % (a, b)) def test_status(self): prox = saranwrap.wrap(time) a = prox.gmtime(0) status = saranwrap.status(prox) self.assertEqual(status['object_count'], 1) self.assertEqual(status['next_id'], 2) self.assertTrue(status['pid']) # can't guess what it will be # status of an object should be the same as the module self.assertEqual(saranwrap.status(a), status) # create a new one then immediately delete it prox.gmtime(1) is_id = prox.ctime(1) # sync up deletes status = saranwrap.status(prox) self.assertEqual(status['object_count'], 1) self.assertEqual(status['next_id'], 3) prox2 = saranwrap.wrap(uuid) self.assertTrue(status['pid'] != saranwrap.status(prox2)['pid']) def test_del(self): prox = saranwrap.wrap(time) delme = prox.gmtime(0) status_before = saranwrap.status(prox) #print status_before['objects'] del delme # need to do an access that doesn't create an object # in order to sync up the deleted objects prox.ctime(1) status_after = saranwrap.status(prox) #print status_after['objects'] self.assertLessThan(status_after['object_count'], status_before['object_count']) def test_contains(self): prox = saranwrap.wrap({'a':'b'}) self.assertTrue('a' in prox) self.assertTrue('x' not in prox) def test_variable_and_keyword_arguments_with_function_calls(self): import optparse prox = saranwrap.wrap(optparse) parser = prox.OptionParser() z = parser.add_option('-n', action='store', type='string', dest='n') opts,args = parser.parse_args(["-nfoo"]) self.assertEqual(opts.n, 'foo') def test_original_proxy_going_out_of_scope(self): def make_uuid(): prox = saranwrap.wrap(uuid) # after this function returns, prox should fall out of scope return prox.uuid4() tid = make_uuid() self.assertEqual(tid.get_version(), uuid.uuid4().get_version()) def make_list(): from greentest import saranwrap_test prox = saranwrap.wrap(saranwrap_test.list_maker) # after this function returns, prox should fall out of scope return prox() proxl = make_list() self.assertEqual(proxl[2], 2) def test_status_of_none(self): try: saranwrap.status(None) self.assertTrue(False) except AttributeError as e: pass def test_not_inheriting_pythonpath(self): # construct a fake module in the temp directory temp_dir = tempfile.mkdtemp("saranwrap_test") fp = open(os.path.join(temp_dir, "jitar_hero.py"), "w") fp.write("""import os, sys pypath = os.environ['PYTHONPATH'] sys_path = sys.path""") fp.close() # this should fail because we haven't stuck the temp_dir in our path yet prox = saranwrap.wrap_module('jitar_hero') import pickle try: prox.pypath self.fail() except pickle.UnpicklingError: pass # now try to saranwrap it sys.path.append(temp_dir) try: import jitar_hero prox = saranwrap.wrap(jitar_hero) self.assertTrue(prox.pypath.count(temp_dir)) self.assertTrue(prox.sys_path.count(temp_dir)) finally: import shutil shutil.rmtree(temp_dir) sys.path.remove(temp_dir) def test_contention(self): from greentest import saranwrap_test prox = saranwrap.wrap(saranwrap_test) pool = Pool(max_size=4) waiters = [] waiters.append(pool.execute(lambda: self.assertEqual(prox.one, 1))) waiters.append(pool.execute(lambda: self.assertEqual(prox.two, 2))) waiters.append(pool.execute(lambda: self.assertEqual(prox.three, 3))) for waiter in waiters: waiter.wait() def test_copy(self): import copy compound_object = {'a':[1,2,3]} prox = saranwrap.wrap(compound_object) def make_assertions(copied): self.assertTrue(isinstance(copied, dict)) self.assertTrue(isinstance(copied['a'], list)) self.assertEqual(copied, compound_object) self.assertNotEqual(id(compound_object), id(copied)) make_assertions(copy.copy(prox)) make_assertions(copy.deepcopy(prox)) def test_list_of_functions(self): return # this test is known to fail, we can implement it sometime in the future if we wish from greentest import saranwrap_test prox = saranwrap.wrap([saranwrap_test.list_maker]) self.assertEqual(list_maker(), prox[0]()) def test_under_the_hood_coroutines(self): # so, we want to write a class which uses a coroutine to call # a function. Then we want to saranwrap that class, have # the object call the coroutine and verify that it ran from greentest import saranwrap_test mod_proxy = saranwrap.wrap(saranwrap_test) obj_proxy = mod_proxy.CoroutineCallingClass() obj_proxy.run_coroutine() # sleep for a bit to make sure out coroutine ran by the time # we check the assert below api.sleep(0.1) self.assertTrue( 'random' in obj_proxy.get_dict(), 'Coroutine in saranwrapped object did not run') def test_child_process_death(self): prox = saranwrap.wrap({}) pid = saranwrap.getpid(prox) self.assertEqual(os.kill(pid, 0), None) # assert that the process is running del prox # removing all references to the proxy should kill the child process api.sleep(0.1) # need to let the signal handler run self.assertRaises(OSError, os.kill, pid, 0) # raises OSError if pid doesn't exist def test_detection_of_server_crash(self): # make the server crash here pass def test_equality_with_local_object(self): # we'll implement this if there's a use case for it pass def test_non_blocking(self): # here we test whether it's nonblocking pass if __name__ == '__main__': unittest.main() diff --git a/greentest/test_socket.py b/greentest/test_socket.py index f477868..ccf79d3 100755 --- a/greentest/test_socket.py +++ b/greentest/test_socket.py @@ -1,1030 +1,1030 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 import unittest from greentest import test_support from eventlib.green import socket from eventlib.green import select from eventlib.green import time from eventlib.green import thread, threading import queue import sys import array from weakref import proxy import signal PORT = 50007 HOST = 'localhost' -MSG = 'Michael Gilfix was here\n' +MSG = b'Michael Gilfix was here\n' class SocketTCPTest(unittest.TestCase): def setUp(self): self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) global PORT PORT = test_support.bind_port(self.serv, HOST, PORT) self.serv.listen(1) def tearDown(self): self.serv.close() self.serv = None class SocketUDPTest(unittest.TestCase): def setUp(self): self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) global PORT PORT = test_support.bind_port(self.serv, HOST, PORT) def tearDown(self): self.serv.close() self.serv = None class ThreadableTest: """Threadable Test class The ThreadableTest class makes it easy to create a threaded client/server pair from an existing unit test. To create a new threaded class from an existing unit test, use multiple inheritance: class NewClass (OldClass, ThreadableTest): pass This class defines two new fixture functions with obvious purposes for overriding: clientSetUp () clientTearDown () Any new test functions within the class must then define tests in pairs, where the test name is preceeded with a '_' to indicate the client portion of the test. Ex: def testFoo(self): # Server portion def _testFoo(self): # Client portion Any exceptions raised by the clients during their tests are caught and transferred to the main thread to alert the testing framework. Note, the server setup function cannot call any blocking functions that rely on the client thread during setup, unless serverExplicityReady() is called just before the blocking call (such as in setting up a client/server connection and performing the accept() in setUp(). """ def __init__(self): # Swap the true setup function self.__setUp = self.setUp self.__tearDown = self.tearDown self.setUp = self._setUp self.tearDown = self._tearDown def serverExplicitReady(self): """This method allows the server to explicitly indicate that it wants the client thread to proceed. This is useful if the server is about to execute a blocking routine that is dependent upon the client thread during its setup routine.""" self.server_ready.set() def _setUp(self): self.server_ready = threading.Event() self.client_ready = threading.Event() self.done = threading.Event() self.queue = queue.Queue(1) # Do some munging to start the client test. methodname = self.id() i = methodname.rfind('.') methodname = methodname[i+1:] test_method = getattr(self, '_' + methodname) self.client_thread = thread.start_new_thread( self.clientRun, (test_method,)) self.__setUp() if not self.server_ready.isSet(): self.server_ready.set() self.client_ready.wait() def _tearDown(self): self.__tearDown() self.done.wait() if not self.queue.empty(): msg = self.queue.get() self.fail(msg) def clientRun(self, test_func): self.server_ready.wait() self.client_ready.set() self.clientSetUp() if not callable(test_func): raise TypeError("test_func must be a callable function") try: test_func() except Exception as strerror: self.queue.put(strerror) self.clientTearDown() def clientSetUp(self): raise NotImplementedError("clientSetUp must be implemented.") def clientTearDown(self): self.done.set() thread.exit() class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest): def __init__(self, methodName='runTest'): SocketTCPTest.__init__(self, methodName=methodName) ThreadableTest.__init__(self) def clientSetUp(self): self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def clientTearDown(self): self.cli.close() self.cli = None ThreadableTest.clientTearDown(self) class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest): def __init__(self, methodName='runTest'): SocketUDPTest.__init__(self, methodName=methodName) ThreadableTest.__init__(self) def clientSetUp(self): self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) class SocketConnectedTest(ThreadedTCPSocketTest): def __init__(self, methodName='runTest'): ThreadedTCPSocketTest.__init__(self, methodName=methodName) def setUp(self): ThreadedTCPSocketTest.setUp(self) # Indicate explicitly we're ready for the client thread to # proceed and then perform the blocking call to accept self.serverExplicitReady() conn, addr = self.serv.accept() self.cli_conn = conn def tearDown(self): self.cli_conn.close() self.cli_conn = None ThreadedTCPSocketTest.tearDown(self) def clientSetUp(self): ThreadedTCPSocketTest.clientSetUp(self) self.cli.connect((HOST, PORT)) self.serv_conn = self.cli def clientTearDown(self): self.serv_conn.close() self.serv_conn = None ThreadedTCPSocketTest.clientTearDown(self) class SocketPairTest(unittest.TestCase, ThreadableTest): def __init__(self, methodName='runTest'): unittest.TestCase.__init__(self, methodName=methodName) ThreadableTest.__init__(self) def setUp(self): self.serv, self.cli = socket.socketpair() def tearDown(self): self.serv.close() self.serv = None def clientSetUp(self): pass def clientTearDown(self): self.cli.close() self.cli = None ThreadableTest.clientTearDown(self) ####################################################################### ## Begin Tests class GeneralModuleTests(unittest.TestCase): def test_weakref(self): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) p = proxy(s) self.assertEqual(p.fileno(), s.fileno()) s.close() s = None try: p.fileno() except ReferenceError: pass else: self.fail('Socket proxy still exists') def testSocketError(self): # Testing socket module exceptions def raise_error(*args, **kwargs): raise socket.error def raise_herror(*args, **kwargs): raise socket.herror def raise_gaierror(*args, **kwargs): raise socket.gaierror self.assertRaises(socket.error, raise_error, "Error raising socket exception.") self.assertRaises(socket.error, raise_herror, "Error raising socket exception.") self.assertRaises(socket.error, raise_gaierror, "Error raising socket exception.") def testCrucialConstants(self): # Testing for mission critical constants socket.AF_INET socket.SOCK_STREAM socket.SOCK_DGRAM socket.SOCK_RAW socket.SOCK_RDM socket.SOCK_SEQPACKET socket.SOL_SOCKET socket.SO_REUSEADDR def testHostnameRes(self): # Testing hostname resolution mechanisms hostname = socket.gethostname() try: ip = socket.gethostbyname(hostname) except socket.error: # Probably name lookup wasn't set up right; skip this test return self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.") try: hname, aliases, ipaddrs = socket.gethostbyaddr(ip) except socket.error: # Probably a similar problem as above; skip this test return all_host_names = [hostname, hname] + aliases fqhn = socket.getfqdn(ip) if not fqhn in all_host_names: self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names))) def testRefCountGetNameInfo(self): # Testing reference count for getnameinfo import sys if hasattr(sys, "getrefcount"): try: # On some versions, this loses a reference orig = sys.getrefcount(__name__) socket.getnameinfo(__name__,0) except SystemError: if sys.getrefcount(__name__) != orig: self.fail("socket.getnameinfo loses a reference") def testInterpreterCrash(self): # Making sure getnameinfo doesn't crash the interpreter try: # On some versions, this crashes the interpreter. socket.getnameinfo(('x', 0, 0, 0), 0) except socket.error: pass def testNtoH(self): # This just checks that htons etc. are their own inverse, # when looking at the lower 16 or 32 bits. sizes = {socket.htonl: 32, socket.ntohl: 32, socket.htons: 16, socket.ntohs: 16} for func, size in list(sizes.items()): mask = (1<= (2, 5): if self.__addr != svr.socket.getsockname(): raise RuntimeError('server_address was %s, expected %s' % (self.__addr, svr.socket.getsockname())) if verbose: print("thread: serving three times") svr.serve_a_few() if verbose: print("thread: done") seed = 0 def pickport(): global seed seed += 1 return 10000 + (os.getpid() % 1000)*10 + seed host = "localhost" testfiles = [] def pickaddr(proto): if proto == socket.AF_INET: return (host, pickport()) else: fn = TESTFN + str(pickport()) if os.name == 'os2': # AF_UNIX socket names on OS/2 require a specific prefix # which can't include a drive letter and must also use # backslashes as directory separators if fn[1] == ':': fn = fn[2:] if fn[0] in (os.sep, os.altsep): fn = fn[1:] fn = os.path.join('\socket', fn) if os.sep == '/': fn = fn.replace(os.sep, os.altsep) else: fn = fn.replace(os.altsep, os.sep) testfiles.append(fn) return fn def cleanup(): for fn in testfiles: try: os.remove(fn) except os.error: pass testfiles[:] = [] def testloop(proto, servers, hdlrcls, testfunc): for svrcls in servers: addr = pickaddr(proto) if verbose: print("ADDR =", addr) print("CLASS =", svrcls) t = ServerThread(addr, svrcls, hdlrcls) if verbose: print("server created") t.start() if verbose: print("server running") for i in range(NREQ): time.sleep(DELAY) if verbose: print("test client", i) testfunc(proto, addr) if verbose: print("waiting for server") t.join() if verbose: print("done") class ForgivingTCPServer(TCPServer): # prevent errors if another process is using the port we want def server_bind(self): host, default_port = self.server_address # this code shamelessly stolen from test.test_support # the ports were changed to protect the innocent import sys for port in [default_port, 3434, 8798, 23833]: try: self.server_address = host, port TCPServer.server_bind(self) break - except socket.error as xxx_todo_changeme: - (err, msg) = xxx_todo_changeme.args + except socket.error as error: + (err, msg) = error.args if err != errno.EADDRINUSE: raise print(' WARNING: failed to listen on port %d, trying another' % port, file=sys.__stderr__) tcpservers = [ForgivingTCPServer, ThreadingTCPServer] if hasattr(os, 'fork') and os.name not in ('os2',): tcpservers.append(ForkingTCPServer) udpservers = [UDPServer, ThreadingUDPServer] if hasattr(os, 'fork') and os.name not in ('os2',): udpservers.append(ForkingUDPServer) if not hasattr(socket, 'AF_UNIX'): streamservers = [] dgramservers = [] else: class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): pass streamservers = [UnixStreamServer, ThreadingUnixStreamServer] if hasattr(os, 'fork') and os.name not in ('os2',): streamservers.append(ForkingUnixStreamServer) class ForkingUnixDatagramServer(ForkingMixIn, UnixDatagramServer): pass dgramservers = [UnixDatagramServer, ThreadingUnixDatagramServer] if hasattr(os, 'fork') and os.name not in ('os2',): dgramservers.append(ForkingUnixDatagramServer) def sloppy_cleanup(): # See http://python.org/sf/1540386 # We need to reap children here otherwise a child from one server # can be left running for the next server and cause a test failure. time.sleep(DELAY) reap_children() def testall(): testloop(socket.AF_INET, tcpservers, MyStreamHandler, teststream) sloppy_cleanup() testloop(socket.AF_INET, udpservers, MyDatagramHandler, testdgram) if hasattr(socket, 'AF_UNIX'): sloppy_cleanup() testloop(socket.AF_UNIX, streamservers, MyStreamHandler, teststream) # Alas, on Linux (at least) recvfrom() doesn't return a meaningful # client address so this cannot work: ##testloop(socket.AF_UNIX, dgramservers, MyDatagramHandler, testdgram) class Test(unittest.TestCase): def tearDown(self): sloppy_cleanup() for tcpserver in tcpservers: n = tcpserver.__name__ exec("""def test_%s(self): testloop(socket.AF_INET, [%s], MyStreamHandler, teststream)""" % (n,n)) for udpserver in udpservers: n = udpserver.__name__ exec("""def test_%s(self): testloop(socket.AF_INET, [%s], MyDatagramHandler, testdgram)""" % (n,n)) if hasattr(socket, 'AF_UNIX'): for streamserver in streamservers: n = streamserver.__name__ exec("""def test_%s(self): testloop(socket.AF_UNIX, [%s], MyStreamHandler, teststream)""" % (n,n)) def testall(): test_support.run_unittest(Test) def test_main(): import imp if imp.lock_held(): # If the import lock is held, the threads will hang. raise TestSkipped("can't run when import lock is held") try: testall() finally: cleanup() reap_children() if __name__ == "__main__": test_main() diff --git a/greentest/test_support.py b/greentest/test_support.py index 90cf87d..506f04f 100644 --- a/greentest/test_support.py +++ b/greentest/test_support.py @@ -1,529 +1,531 @@ """Supporting definitions for the Python regression tests.""" if __name__ != 'greentest.test_support': raise ImportError('test_support must be imported from the test package') import sys class Error(Exception): """Base class for regression test exceptions.""" class TestFailed(Error): """Test failed.""" class TestSkipped(Error): """Test skipped. This can be raised to indicate that a test was deliberatly skipped, but not because a feature wasn't available. For example, if some resource can't be used, such as the network appears to be unavailable, this should be raised instead of TestFailed. """ class ResourceDenied(TestSkipped): """Test skipped because it requested a disallowed resource. This is raised when a test calls requires() for a resource that has not be enabled. It is used to distinguish between expected and unexpected skips. """ verbose = 1 # Flag set to 0 by regrtest.py use_resources = None # Flag set to [] by regrtest.py max_memuse = 0 # Disable bigmem tests (they will still be run with # small sizes, to make sure they work.) # _original_stdout is meant to hold stdout at the time regrtest began. # This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. # The point is to have some flavor of stdout the user can actually see. _original_stdout = None def record_original_stdout(stdout): global _original_stdout _original_stdout = stdout def get_original_stdout(): return _original_stdout or sys.stdout def unload(name): try: del sys.modules[name] except KeyError: pass def unlink(filename): import os try: os.unlink(filename) except OSError: pass def forget(modname): '''"Forget" a module was ever imported by removing it from sys.modules and deleting any .pyc and .pyo files.''' unload(modname) import os for dirname in sys.path: unlink(os.path.join(dirname, modname + os.extsep + 'pyc')) # Deleting the .pyo file cannot be within the 'try' for the .pyc since # the chance exists that there is no .pyc (and thus the 'try' statement # is exited) but there is a .pyo file. unlink(os.path.join(dirname, modname + os.extsep + 'pyo')) def is_resource_enabled(resource): """Test whether a resource is enabled. Known resources are set by regrtest.py.""" return use_resources is not None and resource in use_resources def requires(resource, msg=None): """Raise ResourceDenied if the specified resource is not available. If the caller's module is __main__ then automatically return True. The possibility of False being returned occurs when regrtest.py is executing.""" # see if the caller's module is __main__ - if so, treat as if # the resource was set return if sys._getframe().f_back.f_globals.get("__name__") == "__main__": return if not is_resource_enabled(resource): if msg is None: msg = "Use of the `%s' resource not enabled" % resource raise ResourceDenied(msg) def bind_port(sock, host='', preferred_port=54321): """Try to bind the sock to a port. If we are running multiple tests and we don't try multiple ports, the test can fails. This makes the test more robust.""" import socket, errno # Find some random ports that hopefully no one is listening on. # Ideally each test would clean up after itself and not continue listening # on any ports. However, this isn't the case. The last port (0) is # a stop-gap that asks the O/S to assign a port. Whenever the warning # message below is printed, the test that is listening on the port should # be fixed to close the socket at the end of the test. # Another reason why we can't use a port is another process (possibly # another instance of the test suite) is using the same port. for port in [preferred_port, 9907, 10243, 32999, 0]: try: sock.bind((host, port)) if port == 0: port = sock.getsockname()[1] return port - except socket.error as xxx_todo_changeme: - (err, msg) = xxx_todo_changeme.args + except socket.error as error: + (err, msg) = error.args if err != errno.EADDRINUSE: raise print(' WARNING: failed to listen on port %d, trying another' % port, file=sys.__stderr__) raise TestFailed('unable to find port to listen on') FUZZ = 1e-6 def fcmp(x, y): # fuzzy comparison function if type(x) == type(0.0) or type(y) == type(0.0): try: x, y = coerce(x, y) fuzz = (abs(x) + abs(y)) * FUZZ if abs(x-y) <= fuzz: return 0 except: pass elif type(x) == type(y) and type(x) in (type(()), type([])): for i in range(min(len(x), len(y))): outcome = fcmp(x[i], y[i]) if outcome != 0: return outcome return cmp(len(x), len(y)) return cmp(x, y) try: str have_unicode = 1 except NameError: have_unicode = 0 is_jython = sys.platform.startswith('java') import os # Filename used for testing if os.name == 'java': # Jython disallows @ in module names TESTFN = '$test' elif os.name == 'riscos': TESTFN = 'testfile' else: TESTFN = '@test' # Unicode name only used if TEST_FN_ENCODING exists for the platform. if have_unicode: # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding() # TESTFN_UNICODE is a filename that can be encoded using the # file system encoding, but *not* with the default (ascii) encoding if isinstance('', str): # python -U # XXX perhaps unicode() should accept Unicode strings? TESTFN_UNICODE = "@test-\xe0\xf2" else: # 2 latin characters. TESTFN_UNICODE = str("@test-\xe0\xf2", "latin-1") TESTFN_ENCODING = sys.getfilesystemencoding() # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be # able to be encoded by *either* the default or filesystem encoding. # This test really only makes sense on Windows NT platforms # which have special Unicode support in posixmodule. if (not hasattr(sys, "getwindowsversion") or sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME TESTFN_UNICODE_UNENCODEABLE = None else: # Japanese characters (I think - from bug 846133) TESTFN_UNICODE_UNENCODEABLE = eval('u"@test-\\u5171\\u6709\\u3055\\u308c\\u308b"') try: # XXX - Note - should be using TESTFN_ENCODING here - but for # Windows, "mbcs" currently always operates as if in # errors=ignore' mode - hence we get '?' characters rather than # the exception. 'Latin1' operates as we expect - ie, fails. # See [ 850997 ] mbcs encoding ignores errors TESTFN_UNICODE_UNENCODEABLE.encode("Latin1") except UnicodeEncodeError: pass else: print('WARNING: The filename %r CAN be encoded by the filesystem. ' \ 'Unicode filename tests may not be effective' \ % TESTFN_UNICODE_UNENCODEABLE) # Make sure we can write to TESTFN, try in /tmp if we can't fp = None try: fp = open(TESTFN, 'w+') except IOError: TMP_TESTFN = os.path.join('/tmp', TESTFN) try: fp = open(TMP_TESTFN, 'w+') TESTFN = TMP_TESTFN del TMP_TESTFN except IOError: print(('WARNING: tests will fail, unable to write to: %s or %s' % (TESTFN, TMP_TESTFN))) if fp is not None: fp.close() unlink(TESTFN) del os, fp def findfile(file, here=__file__): """Try to find a file on sys.path and the working directory. If it is not found the argument passed to the function is returned (this does not necessarily signal failure; could still be the legitimate path).""" import os if os.path.isabs(file): return file path = sys.path path = [os.path.dirname(here)] + path for dn in path: fn = os.path.join(dn, file) if os.path.exists(fn): return fn return file def verify(condition, reason='test failed'): """Verify that condition is true. If not, raise TestFailed. The optional argument reason can be given to provide a better error text. """ if not condition: raise TestFailed(reason) def vereq(a, b): """Raise TestFailed if a == b is false. This is better than verify(a == b) because, in case of failure, the error message incorporates repr(a) and repr(b) so you can see the inputs. Note that "not (a == b)" isn't necessarily the same as "a != b"; the former is tested. """ if not (a == b): raise TestFailed("%r == %r" % (a, b)) def sortdict(dict): "Like repr(dict), but in sorted order." items = list(dict.items()) items.sort() reprpairs = ["%r: %r" % pair for pair in items] withcommas = ", ".join(reprpairs) return "{%s}" % withcommas def check_syntax(statement): try: compile(statement, '', 'exec') except SyntaxError: pass else: print('Missing SyntaxError: "%s"' % statement) def open_urlresource(url): import urllib.request, urllib.parse, urllib.error, urllib.parse import os.path filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL! for path in [os.path.curdir, os.path.pardir]: fn = os.path.join(path, filename) if os.path.exists(fn): return open(fn) requires('urlfetch') print('\tfetching %s ...' % url, file=get_original_stdout()) fn, _ = urllib.request.urlretrieve(url, filename) return open(fn) #======================================================================= # Decorator for running a function in a different locale, correctly resetting # it afterwards. def run_with_locale(catstr, *locales): def decorator(func): def inner(*args, **kwds): try: import locale category = getattr(locale, catstr) orig_locale = locale.setlocale(category) except AttributeError: # if the test author gives us an invalid category string raise except: # cannot retrieve original locale, so do nothing locale = orig_locale = None else: for loc in locales: try: locale.setlocale(category, loc) break except: pass # now run the function, resetting the locale on exceptions try: return func(*args, **kwds) finally: if locale and orig_locale: locale.setlocale(category, orig_locale) inner.__name__ = func.__name__ inner.__doc__ = func.__doc__ return inner return decorator #======================================================================= # Big-memory-test support. Separate from 'resources' because memory use should be configurable. # Some handy shorthands. Note that these are used for byte-limits as well # as size-limits, in the various bigmem tests _1M = 1024*1024 _1G = 1024 * _1M _2G = 2 * _1G # Hack to get at the maximum value an internal index can take. class _Dummy: def __getslice__(self, i, j): return j + def __getitem__(self, i): + return i MAX_Py_ssize_t = _Dummy()[:] def set_memlimit(limit): import re global max_memuse sizes = { 'k': 1024, 'm': _1M, 'g': _1G, 't': 1024*_1G, } m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, re.IGNORECASE | re.VERBOSE) if m is None: raise ValueError('Invalid memory limit %r' % (limit,)) memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) if memlimit > MAX_Py_ssize_t: memlimit = MAX_Py_ssize_t if memlimit < _2G - 1: raise ValueError('Memory limit %r too low to be useful' % (limit,)) max_memuse = memlimit def bigmemtest(minsize, memuse, overhead=5*_1M): """Decorator for bigmem tests. 'minsize' is the minimum useful size for the test (in arbitrary, test-interpreted units.) 'memuse' is the number of 'bytes per size' for the test, or a good estimate of it. 'overhead' specifies fixed overhead, independant of the testsize, and defaults to 5Mb. The decorator tries to guess a good value for 'size' and passes it to the decorated test function. If minsize * memuse is more than the allowed memory use (as defined by max_memuse), the test is skipped. Otherwise, minsize is adjusted upward to use up to max_memuse. """ def decorator(f): def wrapper(self): if not max_memuse: # If max_memuse is 0 (the default), # we still want to run the tests with size set to a few kb, # to make sure they work. We still want to avoid using # too much memory, though, but we do that noisily. maxsize = 5147 self.assertFalse(maxsize * memuse + overhead > 20 * _1M) else: maxsize = int((max_memuse - overhead) / memuse) if maxsize < minsize: # Really ought to print 'test skipped' or something if verbose: sys.stderr.write("Skipping %s because of memory " "constraint\n" % (f.__name__,)) return # Try to keep some breathing room in memory use maxsize = max(maxsize - 50 * _1M, minsize) return f(self, maxsize) wrapper.minsize = minsize wrapper.memuse = memuse wrapper.overhead = overhead return wrapper return decorator def bigaddrspacetest(f): """Decorator for tests that fill the address space.""" def wrapper(self): if max_memuse < MAX_Py_ssize_t: if verbose: sys.stderr.write("Skipping %s because of memory " "constraint\n" % (f.__name__,)) else: return f(self) return wrapper #======================================================================= # Preliminary PyUNIT integration. import unittest class BasicTestRunner: def run(self, test): result = unittest.TestResult() test(result) return result def run_suite(suite, testclass=None): """Run tests from a unittest.TestSuite-derived class.""" if verbose: runner = unittest.TextTestRunner(sys.stdout, verbosity=2) else: runner = BasicTestRunner() result = runner.run(suite) if not result.wasSuccessful(): if len(result.errors) == 1 and not result.failures: err = result.errors[0][1] elif len(result.failures) == 1 and not result.errors: err = result.failures[0][1] else: if testclass is None: msg = "errors occurred; run in verbose mode for details" else: msg = "errors occurred in %s.%s" \ % (testclass.__module__, testclass.__name__) raise TestFailed(msg) raise TestFailed(err) def run_unittest(*classes): """Run tests from unittest.TestCase-derived classes.""" suite = unittest.TestSuite() for cls in classes: if isinstance(cls, (unittest.TestSuite, unittest.TestCase)): suite.addTest(cls) else: suite.addTest(unittest.makeSuite(cls)) if len(classes)==1: testclass = classes[0] else: testclass = None run_suite(suite, testclass) #======================================================================= # doctest driver. def run_doctest(module, verbosity=None): """Run doctest on the given module. Return (#failures, #tests). If optional argument verbosity is not specified (or is None), pass test_support's belief about verbosity on to doctest. Else doctest's usual behavior is used (it searches sys.argv for -v). """ import doctest if verbosity is None: verbosity = verbose else: verbosity = None # Direct doctest output (normally just errors) to real stdout; doctest # output shouldn't be compared by regrtest. save_stdout = sys.stdout sys.stdout = get_original_stdout() try: f, t = doctest.testmod(module, verbose=verbosity) if f: raise TestFailed("%d of %d doctests failed" % (f, t)) finally: sys.stdout = save_stdout if verbose: print('doctest (%s) ... %d tests with zero failures' % (module.__name__, t)) return f, t #======================================================================= # Threading support to prevent reporting refleaks when running regrtest.py -R def threading_setup(): from eventlib.green import threading return len(threading._active), len(threading._limbo) def threading_cleanup(num_active, num_limbo): from eventlib.green import threading from eventlib.green import time _MAX_COUNT = 10 count = 0 while len(threading._active) != num_active and count < _MAX_COUNT: print(threading._active) count += 1 time.sleep(0.1) count = 0 while len(threading._limbo) != num_limbo and count < _MAX_COUNT: print(threading._limbo) count += 1 time.sleep(0.1) def reap_children(): """Use this function at the end of test_main() whenever sub-processes are started. This will help ensure that no extra children (zombies) stick around to hog resources and create problems when looking for refleaks. """ # Reap all our dead child processes so we don't leave zombies around. # These hog resources and might be causing some of the buildbots to die. import os if hasattr(os, 'waitpid'): any_process = -1 while True: try: # This will raise an exception on Windows. That's ok. pid, status = os.waitpid(any_process, os.WNOHANG) if pid == 0: break except: break diff --git a/greentest/test_thread__boundedsem.py b/greentest/test_thread__boundedsem.py index f54fe8f..2080e0a 100644 --- a/greentest/test_thread__boundedsem.py +++ b/greentest/test_thread__boundedsem.py @@ -1,11 +1,11 @@ """Test that BoundedSemaphore with a very high bound is as good as unbounded one""" from eventlib import coros from eventlib.green import thread def allocate_lock(): return coros.semaphore(1, 9999) thread.allocate_lock = allocate_lock thread.LockType = coros.BoundedSemaphore -exec(compile(open('test_thread.py', "rb").read(), 'test_thread.py', 'exec')) +exec(compile(open('test_thread.py').read(), 'test_thread.py', 'exec')) diff --git a/greentest/test_urllib2_localnet.py b/greentest/test_urllib2_localnet.py index bf5c525..0423429 100755 --- a/greentest/test_urllib2_localnet.py +++ b/greentest/test_urllib2_localnet.py @@ -1,306 +1,306 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 from greentest import exit_unless_25; exit_unless_25() import sys import urllib.parse import unittest import hashlib from greentest import test_support from eventlib.green import threading from eventlib.green import socket from eventlib.green import urllib2 from eventlib.green import BaseHTTPServer # Loopback http server infrastructure class LoopbackHttpServer(BaseHTTPServer.HTTPServer): """HTTP server w/ a few modifications that make it useful for loopback testing purposes. """ def __init__(self, server_address, RequestHandlerClass): BaseHTTPServer.HTTPServer.__init__(self, server_address, RequestHandlerClass) # Set the timeout of our listening socket really low so # that we can stop the server easily. self.socket.settimeout(1.0) def get_request(self): """BaseHTTPServer method, overridden.""" request, client_address = self.socket.accept() # It's a loopback connection, so setting the timeout # really low shouldn't affect anything, but should make # deadlocks less likely to occur. request.settimeout(10.0) return (request, client_address) class LoopbackHttpServerThread(threading.Thread): """Stoppable thread that runs a loopback http server.""" def __init__(self, port, RequestHandlerClass): threading.Thread.__init__(self) self._RequestHandlerClass = RequestHandlerClass self._stop = False self._port = port self._server_address = ('127.0.0.1', self._port) self.ready = threading.Event() def stop(self): """Stops the webserver if it's currently running.""" # Set the stop flag. self._stop = True self.join() def run(self): protocol = "HTTP/1.0" self._RequestHandlerClass.protocol_version = protocol httpd = LoopbackHttpServer(self._server_address, self._RequestHandlerClass) sa = httpd.socket.getsockname() #print "Serving HTTP on", sa[0], "port", sa[1], "..." self.ready.set() while not self._stop: httpd.handle_request() # Authentication infrastructure class DigestAuthHandler: """Handler for performing digest authentication.""" def __init__(self): self._request_num = 0 self._nonces = [] self._users = {} self._realm_name = "Test Realm" self._qop = "auth" def set_qop(self, qop): self._qop = qop def set_users(self, users): assert isinstance(users, dict) self._users = users def set_realm(self, realm): self._realm_name = realm def _generate_nonce(self): self._request_num += 1 nonce = hashlib.md5(str(self._request_num)).hexdigest() self._nonces.append(nonce) return nonce def _create_auth_dict(self, auth_str): first_space_index = auth_str.find(" ") auth_str = auth_str[first_space_index+1:] parts = auth_str.split(",") auth_dict = {} for part in parts: name, value = part.split("=") name = name.strip() if value[0] == '"' and value[-1] == '"': value = value[1:-1] else: value = value.strip() auth_dict[name] = value return auth_dict def _validate_auth(self, auth_dict, password, method, uri): final_dict = {} final_dict.update(auth_dict) final_dict["password"] = password final_dict["method"] = method final_dict["uri"] = uri HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict HA1 = hashlib.md5(HA1_str).hexdigest() HA2_str = "%(method)s:%(uri)s" % final_dict HA2 = hashlib.md5(HA2_str).hexdigest() final_dict["HA1"] = HA1 final_dict["HA2"] = HA2 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \ "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict response = hashlib.md5(response_str).hexdigest() return response == auth_dict["response"] def _return_auth_challenge(self, request_handler): request_handler.send_response(407, "Proxy Authentication Required") request_handler.send_header("Content-Type", "text/html") request_handler.send_header( 'Proxy-Authenticate', 'Digest realm="%s", ' 'qop="%s",' 'nonce="%s", ' % \ (self._realm_name, self._qop, self._generate_nonce())) # XXX: Not sure if we're supposed to add this next header or # not. #request_handler.send_header('Connection', 'close') request_handler.end_headers() request_handler.wfile.write("Proxy Authentication Required.") return False def handle_request(self, request_handler): """Performs digest authentication on the given HTTP request handler. Returns True if authentication was successful, False otherwise. If no users have been set, then digest auth is effectively disabled and this method will always return True. """ if len(self._users) == 0: return True if 'Proxy-Authorization' not in request_handler.headers: return self._return_auth_challenge(request_handler) else: auth_dict = self._create_auth_dict( request_handler.headers['Proxy-Authorization'] ) if auth_dict["username"] in self._users: password = self._users[ auth_dict["username"] ] else: return self._return_auth_challenge(request_handler) if not auth_dict.get("nonce") in self._nonces: return self._return_auth_challenge(request_handler) else: self._nonces.remove(auth_dict["nonce"]) auth_validated = False # MSIE uses short_path in its validation, but Python's # urllib2 uses the full path, so we're going to see if # either of them works here. for path in [request_handler.path, request_handler.short_path]: if self._validate_auth(auth_dict, password, request_handler.command, path): auth_validated = True if not auth_validated: return self._return_auth_challenge(request_handler) return True # Proxy test infrastructure class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): """This is a 'fake proxy' that makes it look like the entire internet has gone down due to a sudden zombie invasion. It main utility is in providing us with authentication support for testing. """ digest_auth_handler = DigestAuthHandler() def log_message(self, format, *args): # Uncomment the next line for debugging. #sys.stderr.write(format % args) pass def do_GET(self): (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse( self.path, 'http') self.short_path = path if self.digest_auth_handler.handle_request(self): self.send_response(200, "OK") self.send_header("Content-Type", "text/html") self.end_headers() self.wfile.write("You've reached %s!
" % self.path) self.wfile.write("Our apologies, but our server is down due to " "a sudden zombie invasion.") # Test cases class ProxyAuthTests(unittest.TestCase): URL = "http://www.foo.com" PORT = 8080 USER = "tester" PASSWD = "test123" REALM = "TestRealm" PROXY_URL = "http://127.0.0.1:%d" % PORT def setUp(self): FakeProxyHandler.digest_auth_handler.set_users({ self.USER : self.PASSWD }) FakeProxyHandler.digest_auth_handler.set_realm(self.REALM) self.server = LoopbackHttpServerThread(self.PORT, FakeProxyHandler) self.server.start() self.server.ready.wait() handler = urllib.request.ProxyHandler({"http" : self.PROXY_URL}) self._digest_auth_handler = urllib.request.ProxyDigestAuthHandler() self.opener = urllib.request.build_opener(handler, self._digest_auth_handler) def tearDown(self): self.server.stop() def test_proxy_with_bad_password_raises_httperror(self): self._digest_auth_handler.add_password(self.REALM, self.URL, self.USER, self.PASSWD+"bad") FakeProxyHandler.digest_auth_handler.set_qop("auth") self.assertRaises(urllib.error.HTTPError, self.opener.open, self.URL) def test_proxy_with_no_password_raises_httperror(self): FakeProxyHandler.digest_auth_handler.set_qop("auth") self.assertRaises(urllib.error.HTTPError, self.opener.open, self.URL) def test_proxy_qop_auth_works(self): self._digest_auth_handler.add_password(self.REALM, self.URL, self.USER, self.PASSWD) FakeProxyHandler.digest_auth_handler.set_qop("auth") result = self.opener.open(self.URL) while result.read(): pass result.close() def test_proxy_qop_auth_int_works_or_throws_urlerror(self): self._digest_auth_handler.add_password(self.REALM, self.URL, self.USER, self.PASSWD) FakeProxyHandler.digest_auth_handler.set_qop("auth-int") try: result = self.opener.open(self.URL) except urllib.error.URLError: # It's okay if we don't support auth-int, but we certainly # shouldn't receive any kind of exception here other than # a URLError. result = None if result: while result.read(): pass result.close() def test_main(): # We will NOT depend on the network resource flag # (Lib/test/regrtest.py -u network) since all tests here are only # localhost. However, if this is a bad rationale, then uncomment # the next line. #test_support.requires("network") test_support.run_unittest(ProxyAuthTests) if __name__ == "__main__": test_main() diff --git a/greentest/with_eventlet.py b/greentest/with_eventlet.py index 3e7c2a7..567060f 100755 --- a/greentest/with_eventlet.py +++ b/greentest/with_eventlet.py @@ -1,75 +1,75 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (c) 2008-2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. """Execute python script with hub installed. Usage: %prog [--hub HUB] [--reactor REACTOR] program.py """ import sys def import_reactor(reactor): m = __import__('twisted.internet.' + reactor) return getattr(m.internet, reactor) def setup_hub(hub, reactor): if reactor is not None: import_reactor(reactor).install() if hub is not None: from eventlib.api import use_hub try: use_hub(hub) except ImportError as ex: # as a shortcut, try to import the reactor with such name try: r = import_reactor(hub) except ImportError: sys.exit('No hub %s: %s' % (hub, ex)) else: r.install() use_hub('twistedr') def parse_args(): hub = None reactor = None del sys.argv[0] # kill with_eventlib.py if sys.argv[0]=='--hub': del sys.argv[0] hub = sys.argv[0] del sys.argv[0] if sys.argv[0]=='--reactor': del sys.argv[0] reactor = sys.argv[0] del sys.argv[0] return hub, reactor if __name__=='__main__': hub, reactor = parse_args() setup_hub(hub, reactor) from eventlib.api import get_hub hub = get_hub() # set up the hub now print('===HUB=%r' % hub) if 'twisted.internet.reactor' in sys.modules: print('===REACTOR=%r' % sys.modules['twisted.internet.reactor']) sys.stdout.flush() - exec(compile(open(sys.argv[0], "rb").read(), sys.argv[0], 'exec')) + exec(compile(open(sys.argv[0]).read(), sys.argv[0], 'exec')) diff --git a/greentest/with_timeout.py b/greentest/with_timeout.py index ad9645c..1b471ca 100755 --- a/greentest/with_timeout.py +++ b/greentest/with_timeout.py @@ -1,220 +1,220 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (c) 2008-2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. """ Run Python script in a child process. Kill it after timeout has elapsed. If the script was running unittest test cases, the timeouted test case is disabled and the script is restarted. Usage: %prog [-t TIMEOUT] program.py [args] If program.py timed out, return 7 If program.py exited with non-zero value, return 8 If program.py exited with zero value after several runs, return 9 If program.py exited with non-zero value after several runs, return 10 """ import sys import os import time import warnings +import tempfile if sys.argv[1:2] and sys.argv[1]=='-t': del sys.argv[1] TIMEOUT = int(sys.argv[1]) del sys.argv[1] else: TIMEOUT = 20 try: disabled_tests except NameError: disabled_tests = [] try: CURRENT_TEST_FILENAME except NameError: - warnings.filterwarnings('ignore', 'tmpnam is a potential security risk to your program') - CURRENT_TEST_FILENAME = os.tmpnam() - del warnings.filters[0] + CURRENT_TEST_FILENAME = tempfile.NamedTemporaryFile(delete=False).name class Alarm(Exception): pass def al(*args): raise Alarm def _test(): """ >>> system('./with_timeout.py -t 3 __init__.py') (0, 0) - >>> system('./with_timeout.py -t 3 /usr/lib/python2.5/BaseHTTPServer.py 0') + >>> system('./with_timeout.py -t 3 /usr/lib/python3.5/BaseHTTPServer.py 0') (7, 3) >>> system('./with_timeout.py -t 3 with_timeout.py --selftest1') (9, 3) >>> system('./with_timeout.py -t 3 with_timeout.py --selftest2') (10, 3) >>> system('./with_timeout.py -t 3 with_timeout.py no_such_file.xxx') (8, 0) """ import doctest doctest.testmod() if not sys.argv[1:]: def system(*args): start = time.time() res = os.system(*args) return res>>8, int(time.time()-start) #system('./with_timeout.py -t 3 with_timeout.py selftest') #sys.exit(0) _test() sys.exit(__doc__.replace('%prog', sys.argv[0])) elif sys.argv[1:]==['--selftest1']: import unittest class Test(unittest.TestCase): def test1(self): pass def test_long(self): time.sleep(10) from greentest import test_support test_support.run_unittest(Test) sys.exit(0) elif sys.argv[1:]==['--selftest2']: import unittest class Test(unittest.TestCase): def test_fail(self): fail def test_long(self): time.sleep(10) from greentest import test_support test_support.run_unittest(Test) sys.exit(0) filename = sys.argv[1] del sys.argv[0] def execf(): #print 'in execf', disabled_tests def patch_unittest(): - "print test name before it was run and write it pipe" + "print test name before it was run and write it to pipe" import unittest class TestCase(unittest.TestCase): base = unittest.TestCase def run(self, result=None): try: testMethodName = self._testMethodName except: testMethodName = self.__testMethodName name = "%s.%s" % (self.__class__.__name__, testMethodName) if name in disabled_tests: return print(name, ' ') sys.stdout.flush() - file(CURRENT_TEST_FILENAME, 'w').write(name) + print(CURRENT_TEST_FILENAME) + open(CURRENT_TEST_FILENAME, "w").write(name) try: return self.base.run(self, result) finally: sys.stdout.flush() try: os.unlink(CURRENT_TEST_FILENAME) except: pass unittest.TestCase = TestCase patch_unittest() - exec(compile(open(filename, "rb").read(), filename, 'exec'), globals()) + exec(compile(open(filename).read(), filename, 'exec'), globals()) while True: #print 'before fork, %s' % disabled_tests try: os.unlink(CURRENT_TEST_FILENAME) except: pass child = os.fork() if child == 0: print('===PYTHON=%s.%s.%s' % sys.version_info[:3]) print('===ARGV=%s' % ' '.join(sys.argv)) print('===TIMEOUT=%r' % TIMEOUT) sys.stdout.flush() execf() break else: start = time.time() import signal signal.signal(signal.SIGALRM, al) signal.alarm(TIMEOUT) pid = None try: pid, status = os.waitpid(child, 0) signal.alarm(0) except Alarm: try: os.kill(child, signal.SIGKILL) except Exception: pass print('\n===%s was killed after %s seconds' % (child, time.time()-start)) sys.stdout.flush() bad_test = None try: - bad_test = file(CURRENT_TEST_FILENAME).read() + bad_test = open(CURRENT_TEST_FILENAME, "rb").read() except IOError: pass if bad_test in disabled_tests: print('\n===%s was disabled but it still managed to fail?!' % bad_test) sys.stdout.flush() break if bad_test is None: sys.exit(7) print('\n===Trying again, now without %s' % bad_test) sys.stdout.flush() disabled_tests.append(bad_test) except: try: signal.alarm(0) except: pass try: os.kill(child, signal.SIGKILL) except: pass raise else: print('===%s exited with code %s' % (pid, status)) sys.stdout.flush() if disabled_tests: print('\n===disabled because of timeout: %s\n%s\n' % (len(disabled_tests), '\n'.join(disabled_tests))) sys.stdout.flush() if disabled_tests: if status: retcode = 10 else: retcode = 9 else: if status: retcode = 8 else: retcode = 0 sys.exit(retcode)