diff --git a/examples/connect.py b/examples/connect.py index 7cff938..72e45e5 100644 --- a/examples/connect.py +++ b/examples/connect.py @@ -1,52 +1,52 @@ # Copyright (c) 2008-2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. """Spawn multiple workers and collect their results. -Demonstrates how to use eventlet.green package and proc module. +Demonstrates how to use eventlib.green package and proc module. """ -from eventlet import proc -from eventlet.green import socket +from eventlib import proc +from eventlib.green import socket -# this example works with both standard eventlet hubs and with twisted-based hub +# this example works with both standard eventlib hubs and with twisted-based hub # uncomment the following line to use twisted hub #from twisted.internet import reactor def geturl(url): c = socket.socket() ip = socket.gethostbyname(url) c.connect((ip, 80)) print '%s connected' % url c.send('GET /\r\n\r\n') return c.recv(1024) -urls = ['www.google.com', 'www.yandex.ru', 'www.python.org'] +urls = ['www.google.com', 'www.yandex.ru', 'www.python.org', 'ag-projects.com', 'sylkserver.com'] jobs = [proc.spawn(geturl, x) for x in urls] print 'spawned %s jobs' % len(jobs) # collect the results from workers results = proc.waitall(jobs) # Note, that any exception in the workers will be reraised by waitall # unless trap_errors argument specifies otherwise for url, result in zip(urls, results): print '%s: %s' % (url, repr(result)[:50]) diff --git a/examples/echoserver.py b/examples/echoserver.py index 0cc1e70..9bbbb0c 100755 --- a/examples/echoserver.py +++ b/examples/echoserver.py @@ -1,55 +1,55 @@ #!/usr/bin/python2 """\ @file echoserver.py Simple server that listens on port 6000 and echos back every input to the client. To try out the server, start it up by running this file. Connect to it with: telnet localhost 6000 You terminate your connection by terminating telnet (typically Ctrl-] and then 'quit') Copyright (c) 2007, Linden Research, Inc. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ -from eventlet import api +from eventlib import api def handle_socket(reader, writer): print "client connected" while True: # pass through every non-eof line x = reader.readline() if not x: break writer.write(x) print "echoed", x print "client disconnected" print "server socket listening on port 6000" server = api.tcp_listener(('0.0.0.0', 6000)) while True: try: new_sock, address = server.accept() except KeyboardInterrupt: break # handle every new connection with a new coroutine api.spawn(handle_socket, new_sock.makefile('r'), new_sock.makefile('w')) diff --git a/examples/twisted_client.py b/examples/twisted_client.py index 38061aa..eca9521 100644 --- a/examples/twisted_client.py +++ b/examples/twisted_client.py @@ -1,26 +1,26 @@ """Example for GreenTransport and GreenClientCreator. In this example reactor is started implicitly upon the first use of a blocking function. """ from twisted.internet import ssl from twisted.internet.error import ConnectionClosed -from eventlet.twistedutil.protocol import GreenClientCreator -from eventlet.twistedutil.protocols.basic import LineOnlyReceiverTransport +from eventlib.twistedutil.protocol import GreenClientCreator +from eventlib.twistedutil.protocols.basic import LineOnlyReceiverTransport from twisted.internet import reactor # read from TCP connection conn = GreenClientCreator(reactor).connectTCP('www.google.com', 80) conn.write('GET / HTTP/1.0\r\n\r\n') conn.loseWriteConnection() print conn.read() # read from SSL connection line by line conn = GreenClientCreator(reactor, LineOnlyReceiverTransport).connectSSL('sf.net', 443, ssl.ClientContextFactory()) conn.write('GET / HTTP/1.0\r\n\r\n') try: for num, line in enumerate(conn): print '%3s %r' % (num, line) except ConnectionClosed, ex: print ex diff --git a/examples/twisted_http_proxy.py b/examples/twisted_http_proxy.py index 6783a5b..20dae75 100644 --- a/examples/twisted_http_proxy.py +++ b/examples/twisted_http_proxy.py @@ -1,90 +1,90 @@ # Copyright (c) 2008-2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. """Listen on port 8888 and pretend to be an HTTP proxy. It even works for some pages. Demonstrates how to - * plug in eventlet into a twisted application (join_reactor) + * plug in eventlib into a twisted application (join_reactor) * call green functions from places where blocking calls are not allowed (deferToGreenThread) - * use eventlet.green package which provides [some of] the + * use eventlib.green package which provides [some of] the standard library modules that don't block other greenlets. """ import re from twisted.internet.protocol import Factory from twisted.internet import reactor from twisted.protocols import basic -from eventlet.twistedutil import deferToGreenThread -from eventlet.twistedutil import join_reactor -from eventlet.green import httplib +from eventlib.twistedutil import deferToGreenThread +from eventlib.twistedutil import join_reactor +from eventlib.green import httplib class LineOnlyReceiver(basic.LineOnlyReceiver): def connectionMade(self): self.lines = [] def lineReceived(self, line): if line: self.lines.append(line) elif self.lines: self.requestReceived(self.lines) self.lines = [] def requestReceived(self, lines): request = re.match('^(\w+) http://(.*?)(/.*?) HTTP/1..$', lines[0]) #print request.groups() method, host, path = request.groups() headers = dict(x.split(': ', 1) for x in lines[1:]) def callback(result): self.transport.write(str(result)) self.transport.loseConnection() def errback(err): err.printTraceback() self.transport.loseConnection() d = deferToGreenThread(http_request, method, host, path, headers=headers) d.addCallbacks(callback, errback) def http_request(method, host, path, headers): conn = httplib.HTTPConnection(host) conn.request(method, path, headers=headers) response = conn.getresponse() body = response.read() print method, host, path, response.status, response.reason, len(body) return format_response(response, body) def format_response(response, body): result = "HTTP/1.1 %s %s" % (response.status, response.reason) for k, v in response.getheaders(): result += '\r\n%s: %s' % (k, v) if body: result += '\r\n\r\n' result += body result += '\r\n' return result class MyFactory(Factory): protocol = LineOnlyReceiver print __doc__ reactor.listenTCP(8888, MyFactory()) reactor.run() diff --git a/examples/twisted_portforward.py b/examples/twisted_portforward.py index da7e3e7..20c19be 100755 --- a/examples/twisted_portforward.py +++ b/examples/twisted_portforward.py @@ -1,59 +1,59 @@ #!/usr/bin/python2 # Copyright (c) 2008-2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. """Port forwarder USAGE: twisted_portforward.py local_port remote_host remote_port""" import sys from twisted.internet import reactor -from eventlet.twistedutil import join_reactor -from eventlet.twistedutil.protocol import GreenClientCreator, SpawnFactory, UnbufferedTransport -from eventlet import proc +from eventlib.twistedutil import join_reactor +from eventlib.twistedutil.protocol import GreenClientCreator, SpawnFactory, UnbufferedTransport +from eventlib import proc def forward(source, dest): try: while True: x = source.recv() if not x: break print 'forwarding %s bytes' % len(x) dest.write(x) finally: dest.loseConnection() def handler(local): client = str(local.getHost()) print 'accepted connection from %s' % client remote = GreenClientCreator(reactor, UnbufferedTransport).connectTCP(remote_host, remote_port) a = proc.spawn(forward, remote, local) b = proc.spawn(forward, local, remote) proc.waitall([a, b], trap_errors=True) print 'closed connection to %s' % client try: local_port, remote_host, remote_port = sys.argv[1:] except ValueError: sys.exit(__doc__) local_port = int(local_port) remote_port = int(remote_port) reactor.listenTCP(local_port, SpawnFactory(handler)) reactor.run() diff --git a/examples/twisted_server.py b/examples/twisted_server.py index a52895a..3e2890e 100644 --- a/examples/twisted_server.py +++ b/examples/twisted_server.py @@ -1,63 +1,63 @@ # Copyright (c) 2008-2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. """Simple chat demo application. Listen on port 8007 and re-send all the data received to other participants. Demonstrates how to - * plug in eventlet into a twisted application (join_reactor) + * plug in eventlib into a twisted application (join_reactor) * how to use SpawnFactory to start a new greenlet for each new request. """ -from eventlet.twistedutil import join_reactor -from eventlet.twistedutil.protocol import SpawnFactory -from eventlet.twistedutil.protocols.basic import LineOnlyReceiverTransport +from eventlib.twistedutil import join_reactor +from eventlib.twistedutil.protocol import SpawnFactory +from eventlib.twistedutil.protocols.basic import LineOnlyReceiverTransport class Chat: def __init__(self): self.participants = [] def handler(self, conn): peer = conn.getPeer() print 'new connection from %s' % (peer, ) conn.write("Welcome! There're %s participants already\n" % (len(self.participants))) self.participants.append(conn) try: for line in conn: if line: print 'received from %s: %s' % (peer, line) for buddy in self.participants: if buddy is not conn: buddy.sendline('from %s: %s' % (peer, line)) except Exception, ex: print peer, ex else: print peer, 'connection done' finally: conn.loseConnection() self.participants.remove(conn) print __doc__ chat = Chat() from twisted.internet import reactor reactor.listenTCP(8007, SpawnFactory(chat.handler, LineOnlyReceiverTransport)) reactor.run() diff --git a/examples/twisted_srvconnector.py b/examples/twisted_srvconnector.py index df60dd8..cc9b015 100644 --- a/examples/twisted_srvconnector.py +++ b/examples/twisted_srvconnector.py @@ -1,55 +1,55 @@ # Copyright (c) 2008-2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. from twisted.internet import reactor from twisted.names.srvconnect import SRVConnector from gnutls.interfaces.twisted import TLSContext, X509Credentials -from eventlet.twistedutil.protocol import GreenClientCreator -from eventlet.twistedutil.protocols.basic import LineOnlyReceiverTransport +from eventlib.twistedutil.protocol import GreenClientCreator +from eventlib.twistedutil.protocols.basic import LineOnlyReceiverTransport class NoisySRVConnector(SRVConnector): def pickServer(self): host, port = SRVConnector.pickServer(self) print 'Resolved _%s._%s.%s --> %s:%s' % (self.service, self.protocol, self.domain, host, port) return host, port cred = X509Credentials(None, None) ctx = TLSContext(cred) creator = GreenClientCreator(reactor, LineOnlyReceiverTransport) conn = creator.connectSRV('msrps', 'ag-projects.com', connectFuncName='connectTLS', connectFuncArgs=(ctx,), ConnectorClass=NoisySRVConnector) request = """MSRP 49fh AUTH To-Path: msrps://alice@intra.example.com;tcp From-Path: msrps://alice.example.com:9892/98cjs;tcp -------49fh$ """.replace('\n', '\r\n') print 'Sending:\n%s' % request conn.write(request) print 'Received:' for x in conn: print repr(x) if '-------' in x: break diff --git a/examples/twisted_xcap_proxy.py b/examples/twisted_xcap_proxy.py index 77073d7..7271384 100644 --- a/examples/twisted_xcap_proxy.py +++ b/examples/twisted_xcap_proxy.py @@ -1,31 +1,31 @@ from twisted.internet.protocol import Factory from twisted.internet import reactor from twisted.protocols import basic from xcaplib.green import XCAPClient -from eventlet.twistedutil import deferToGreenThread -from eventlet.twistedutil import join_reactor +from eventlib.twistedutil import deferToGreenThread +from eventlib.twistedutil import join_reactor class LineOnlyReceiver(basic.LineOnlyReceiver): def lineReceived(self, line): print 'received: %r' % line if not line: return app, context, node = (line + ' ').split(' ', 3) context = {'u' : 'users', 'g': 'global'}.get(context, context) d = deferToGreenThread(client._get, app, node, globaltree=context=='global') def callback(result): self.transport.write(str(result)) def errback(error): self.transport.write(error.getTraceback()) d.addCallback(callback) d.addErrback(errback) class MyFactory(Factory): protocol = LineOnlyReceiver client = XCAPClient('https://xcap.sipthor.net/xcap-root', 'alice@example.com', '123') reactor.listenTCP(8007, MyFactory()) reactor.run() diff --git a/examples/wsgi.py b/examples/wsgi.py index 150fead..391ee26 100644 --- a/examples/wsgi.py +++ b/examples/wsgi.py @@ -1,20 +1,20 @@ -"""This is a simple example of running a wsgi application with eventlet. +"""This is a simple example of running a wsgi application with eventlib. For a more fully-featured server which supports multiple processes, multiple threads, and graceful code reloading, see: http://pypi.python.org/pypi/Spawning/ """ -from eventlet import api, wsgi +from eventlib import api, wsgi def hello_world(env, start_response): if env['PATH_INFO'] != '/': start_response('404 Not Found', [('Content-Type', 'text/plain')]) return ['Not Found\r\n'] start_response('200 OK', [('Content-Type', 'text/plain')]) return ['Hello, World!\r\n'] wsgi.server(api.tcp_listener(('', 8080)), hello_world) diff --git a/greentest/__init__.py b/greentest/__init__.py index 7caef1d..39ed787 100644 --- a/greentest/__init__.py +++ b/greentest/__init__.py @@ -1,56 +1,56 @@ # Copyright (c) 2008-2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. # package is named greentest, not test, so it won't be confused with test in stdlib import sys import os import errno import unittest disabled_marker = '-*-*-*-*-*- disabled -*-*-*-*-*-' def exit_disabled(): sys.exit(disabled_marker) def exit_unless_twisted(): - from eventlet.api import get_hub + from eventlib.api import get_hub if 'Twisted' not in type(get_hub()).__name__: exit_disabled() def exit_unless_25(): if sys.version_info[:2]<(2, 5): exit_disabled() class LimitedTestCase(unittest.TestCase): def setUp(self): - from eventlet import api + from eventlib import api self.timer = api.exc_after(1, RuntimeError('test is taking too long')) def tearDown(self): self.timer.cancel() def find_command(command): for dir in os.getenv('PATH', '/usr/bin:/usr/sbin').split(os.pathsep): p = os.path.join(dir, command) if os.access(p, os.X_OK): return p raise IOError(errno.ENOENT, 'Command not found: %r' % command) diff --git a/greentest/api_test.py b/greentest/api_test.py index d22b564..6078371 100644 --- a/greentest/api_test.py +++ b/greentest/api_test.py @@ -1,226 +1,226 @@ # @author Donovan Preston # # Copyright (c) 2006-2007, Linden Research, Inc. # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import os import os.path from unittest import TestCase, main -from eventlet import api -from eventlet import greenio -from eventlet import util +from eventlib import api +from eventlib import greenio +from eventlib import util def check_hub(): # Clear through the descriptor queue api.sleep(0) api.sleep(0) hub = api.get_hub() for nm in 'get_readers', 'get_writers', 'get_excs': dct = getattr(hub, nm)() assert not dct, "hub.%s not empty: %s" % (nm, dct) # Stop the runloop (unless it's twistedhub which does not support that) if not getattr(api.get_hub(), 'uses_twisted_reactor', None): api.get_hub().abort() api.sleep(0) ### ??? assert not api.get_hub().running class TestApi(TestCase): mode = 'static' certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt') private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key') def test_tcp_listener(self): socket = api.tcp_listener(('0.0.0.0', 0)) assert socket.getsockname()[0] == '0.0.0.0' socket.close() check_hub() def test_connect_tcp(self): def accept_once(listenfd): try: conn, addr = listenfd.accept() fd = conn.makeGreenFile() conn.close() fd.write('hello\n') fd.close() finally: listenfd.close() server = api.tcp_listener(('0.0.0.0', 0)) api.spawn(accept_once, server) client = api.connect_tcp(('127.0.0.1', server.getsockname()[1])) fd = client.makeGreenFile() client.close() assert fd.readline() == 'hello\n' assert fd.read() == '' fd.close() check_hub() def test_connect_ssl(self): def accept_once(listenfd): try: conn, addr = listenfd.accept() fl = conn.makeGreenFile('w') fl.write('hello\r\n') fl.close() conn.close() finally: listenfd.close() server = api.ssl_listener(('0.0.0.0', 0), self.certificate_file, self.private_key_file) api.spawn(accept_once, server) client = util.wrap_ssl( api.connect_tcp(('127.0.0.1', server.getsockname()[1]))) client = client.makeGreenFile() assert client.readline() == 'hello\r\n' assert client.read() == '' client.close() def test_server(self): connected = [] server = api.tcp_listener(('0.0.0.0', 0)) bound_port = server.getsockname()[1] def accept_twice((conn, addr)): connected.append(True) conn.close() if len(connected) == 2: server.close() api.call_after(0, api.connect_tcp, ('127.0.0.1', bound_port)) api.call_after(0, api.connect_tcp, ('127.0.0.1', bound_port)) try: api.tcp_server(server, accept_twice) except: api.sleep(0.1) raise assert len(connected) == 2 check_hub() def test_001_trampoline_timeout(self): server = api.tcp_listener(('0.0.0.0', 0)) bound_port = server.getsockname()[1] try: desc = greenio.GreenSocket(util.tcp_socket()) desc.connect(('127.0.0.1', bound_port)) api.trampoline(desc, read=True, write=False, timeout=0.1) except api.TimeoutError: pass # test passed else: assert False, "Didn't timeout" check_hub() def test_timeout_cancel(self): server = api.tcp_listener(('0.0.0.0', 0)) bound_port = server.getsockname()[1] def client_connected((conn, addr)): conn.close() def go(): client = util.tcp_socket() desc = greenio.GreenSocket(client) desc.connect(('127.0.0.1', bound_port)) try: api.trampoline(desc, read=True, write=True, timeout=0.1) except api.TimeoutError: assert False, "Timed out" server.close() client.close() api.call_after(0, go) api.tcp_server(server, client_connected) check_hub() if not getattr(api.get_hub(), 'uses_twisted_reactor', None): def test_explicit_hub(self): oldhub = api.get_hub() try: api.use_hub(Foo) assert isinstance(api.get_hub(), Foo), api.get_hub() finally: api._threadlocal.hub = oldhub check_hub() def test_named(self): named_foo = api.named('api_test.Foo') self.assertEquals( named_foo.__name__, "Foo") def test_naming_missing_class(self): self.assertRaises( ImportError, api.named, 'this_name_should_hopefully_not_exist.Foo') def test_timeout_and_final_write(self): # This test verifies that a write on a socket that we've # stopped listening for doesn't result in an incorrect switch rpipe, wpipe = os.pipe() rfile = os.fdopen(rpipe,"r",0) wrap_rfile = greenio.GreenPipe(rfile) wfile = os.fdopen(wpipe,"w",0) wrap_wfile = greenio.GreenPipe(wfile) def sender(evt): api.sleep(0.02) wrap_wfile.write('hi') evt.send('sent via event') - from eventlet import coros + from eventlib import coros evt = coros.event() api.spawn(sender, evt) try: # try and get some data off of this pipe # but bail before any is sent api.exc_after(0.01, api.TimeoutError) _c = wrap_rfile.read(1) self.fail() except api.TimeoutError: pass result = evt.wait() self.assertEquals(result, 'sent via event') class Foo(object): pass if __name__ == '__main__': main() diff --git a/greentest/coros_test.py b/greentest/coros_test.py index 3121301..a43b5f7 100644 --- a/greentest/coros_test.py +++ b/greentest/coros_test.py @@ -1,219 +1,219 @@ # @author Donovan Preston, Ryan Williams # # Copyright (c) 2000-2007, Linden Research, Inc. # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. from unittest import TestCase, main -from eventlet import coros, api +from eventlib import coros, api class TestEvent(TestCase): mode = 'static' def setUp(self): # raise an exception if we're waiting forever self._cancel_timeout = api.exc_after(1, RuntimeError('test takes too long')) def tearDown(self): self._cancel_timeout.cancel() def test_waiting_for_event(self): evt = coros.event() value = 'some stuff' def send_to_event(): evt.send(value) api.spawn(send_to_event) self.assertEqual(evt.wait(), value) def test_multiple_waiters(self): evt = coros.event() value = 'some stuff' results = [] def wait_on_event(i_am_done): evt.wait() results.append(True) i_am_done.send() waiters = [] count = 5 for i in range(count): waiters.append(coros.event()) api.spawn(wait_on_event, waiters[-1]) evt.send() for w in waiters: w.wait() self.assertEqual(len(results), count) def test_reset(self): evt = coros.event() # calling reset before send should throw self.assertRaises(AssertionError, evt.reset) value = 'some stuff' def send_to_event(): evt.send(value) api.spawn(send_to_event) self.assertEqual(evt.wait(), value) # now try it again, and we should get the same exact value, # and we shouldn't be allowed to resend without resetting value2 = 'second stuff' self.assertRaises(AssertionError, evt.send, value2) self.assertEqual(evt.wait(), value) # reset and everything should be happy evt.reset() def send_to_event2(): evt.send(value2) api.spawn(send_to_event2) self.assertEqual(evt.wait(), value2) def test_double_exception(self): evt = coros.event() # send an exception through the event evt.send(exc=RuntimeError('from test_double_exception')) self.assertRaises(RuntimeError, evt.wait) evt.reset() # shouldn't see the RuntimeError again api.exc_after(0.001, api.TimeoutError('from test_double_exception')) self.assertRaises(api.TimeoutError, evt.wait) class IncrActor(coros.Actor): def received(self, evt): self.value = getattr(self, 'value', 0) + 1 if evt: evt.send() class TestActor(TestCase): mode = 'static' def setUp(self): # raise an exception if we're waiting forever self._cancel_timeout = api.exc_after(1, api.TimeoutError()) self.actor = IncrActor() def tearDown(self): self._cancel_timeout.cancel() api.kill(self.actor._killer) def test_cast(self): evt = coros.event() self.actor.cast(evt) evt.wait() evt.reset() self.assertEqual(self.actor.value, 1) self.actor.cast(evt) evt.wait() self.assertEqual(self.actor.value, 2) def test_cast_multi_1(self): # make sure that both messages make it in there evt = coros.event() evt1 = coros.event() self.actor.cast(evt) self.actor.cast(evt1) evt.wait() evt1.wait() self.assertEqual(self.actor.value, 2) def test_cast_multi_2(self): # the actor goes through a slightly different code path if it # is forced to enter its event loop prior to any cast()s api.sleep(0) self.test_cast_multi_1() def test_sleeping_during_received(self): # ensure that even if the received method cooperatively # yields, eventually all messages are delivered msgs = [] waiters = [] def received( (message, evt) ): api.sleep(0) msgs.append(message) evt.send() self.actor.received = received waiters.append(coros.event()) self.actor.cast( (1, waiters[-1])) api.sleep(0) waiters.append(coros.event()) self.actor.cast( (2, waiters[-1]) ) waiters.append(coros.event()) self.actor.cast( (3, waiters[-1]) ) api.sleep(0) waiters.append(coros.event()) self.actor.cast( (4, waiters[-1]) ) waiters.append(coros.event()) self.actor.cast( (5, waiters[-1]) ) for evt in waiters: evt.wait() self.assertEqual(msgs, [1,2,3,4,5]) def test_raising_received(self): msgs = [] def received( (message, evt) ): evt.send() if message == 'fail': raise RuntimeError() else: msgs.append(message) self.actor.received = received evt = coros.event() self.actor.cast( ('fail', evt) ) evt.wait() evt.reset() self.actor.cast( ('should_appear', evt) ) evt.wait() self.assertEqual(['should_appear'], msgs) def test_multiple(self): self.actor = IncrActor(concurrency=2) total = [0] def received( (func, ev, value) ): func() total[0] += value ev.send() self.actor.received = received def onemoment(): api.sleep(0.1) evt = coros.event() evt1 = coros.event() self.actor.cast( (onemoment, evt, 1) ) self.actor.cast( (lambda: None, evt1, 2) ) evt1.wait() self.assertEqual(total[0], 2) # both coroutines should have been used self.assertEqual(self.actor._pool.current_size, 2) api.sleep(0) self.assertEqual(self.actor._pool.free(), 1) evt.wait() self.assertEqual(total[0], 3) api.sleep(0) self.assertEqual(self.actor._pool.free(), 2) if __name__ == '__main__': main() diff --git a/greentest/db_pool_test.py b/greentest/db_pool_test.py index 7299b61..1bb06d3 100755 --- a/greentest/db_pool_test.py +++ b/greentest/db_pool_test.py @@ -1,534 +1,534 @@ #!/usr/bin/python2 # @file test_mysql_pool.py # @brief Test cases for mysql_pool # # Copyright (c) 2007, Linden Research, Inc. # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. from unittest import TestCase, main -from eventlet import api, coros -from eventlet import db_pool +from eventlib import api, coros +from eventlib import db_pool class DBTester(object): def setUp(self): self.create_db() self.connection = None connection = self._dbmodule.connect(**self._auth) cursor = connection.cursor() cursor.execute("""CREATE TABLE gargleblatz ( a INTEGER ) ENGINE = InnoDB;""") connection.commit() cursor.close() def tearDown(self): if self.connection: self.connection.close() self.drop_db() def set_up_test_table(self, connection = None): close_connection = False if connection is None: close_connection = True if self.connection is None: connection = self._dbmodule.connect(**self._auth) else: connection = self.connection cursor = connection.cursor() cursor.execute("""CREATE TEMPORARY TABLE test_table ( row_id INTEGER PRIMARY KEY AUTO_INCREMENT, value_int INTEGER, value_float FLOAT, value_string VARCHAR(200), value_uuid CHAR(36), value_binary BLOB, value_binary_string VARCHAR(200) BINARY, value_enum ENUM('Y','N'), created TIMESTAMP ) ENGINE = InnoDB;""") connection.commit() cursor.close() if close_connection: connection.close() # silly mock class class Mock(object): pass class TestDBConnectionPool(DBTester): def setUp(self): super(TestDBConnectionPool, self).setUp() self.pool = self.create_pool() self.connection = self.pool.get() def tearDown(self): if self.connection: self.pool.put(self.connection) super(TestDBConnectionPool, self).tearDown() def assert_cursor_works(self, cursor): # TODO: this is pretty mysql-specific cursor.execute("show full processlist") rows = cursor.fetchall() self.assert_(rows) def test_connecting(self): self.assert_(self.connection is not None) def test_create_cursor(self): cursor = self.connection.cursor() cursor.close() def test_run_query(self): cursor = self.connection.cursor() self.assert_cursor_works(cursor) cursor.close() def test_run_bad_query(self): cursor = self.connection.cursor() try: cursor.execute("garbage blah blah") self.assert_(False) except AssertionError: raise except Exception, e: pass cursor.close() def test_put_none(self): # the pool is of size 1, and its only connection is out self.assert_(self.pool.free() == 0) self.pool.put(None) # ha ha we fooled it into thinking that we had a dead process self.assert_(self.pool.free() == 1) conn2 = self.pool.get() self.assert_(conn2 is not None) self.assert_(conn2.cursor) del conn2 def test_close_does_a_put(self): self.assert_(self.pool.free() == 0) self.connection.close() self.assert_(self.pool.free() == 1) self.assertRaises(AttributeError, self.connection.cursor) def test_deletion_does_a_put(self): self.assert_(self.pool.free() == 0) self.connection = None self.assert_(self.pool.free() == 1) def test_put_doesnt_double_wrap(self): self.pool.put(self.connection) conn = self.pool.get() self.assert_(not isinstance(conn._base, db_pool.PooledConnectionWrapper)) def test_bool(self): self.assert_(self.connection) self.connection.close() self.assert_(not self.connection) def fill_test_table(self, conn): curs = conn.cursor() for i in range(1000): curs.execute('insert into test_table (value_int) values (%s)' % i) conn.commit() def test_returns_immediately(self): self.pool = self.create_pool() conn = self.pool.get() self.set_up_test_table(conn) self.fill_test_table(conn) curs = conn.cursor() results = [] SHORT_QUERY = "select * from test_table" evt = coros.event() def a_query(): self.assert_cursor_works(curs) curs.execute(SHORT_QUERY) results.append(2) evt.send() evt2 = coros.event() api.spawn(a_query) results.append(1) self.assertEqual([1], results) evt.wait() self.assertEqual([1, 2], results) def test_connection_is_clean_after_put(self): self.pool = self.create_pool() conn = self.pool.get() self.set_up_test_table(conn) curs = conn.cursor() for i in range(10): curs.execute('insert into test_table (value_int) values (%s)' % i) # do not commit :-) self.pool.put(conn) del conn conn2 = self.pool.get() curs2 = conn2.cursor() for i in range(10): curs2.execute('insert into test_table (value_int) values (%s)' % i) conn2.commit() rows = curs2.execute("select * from test_table") # we should have only inserted them once self.assertEqual(10, rows) def test_visibility_from_other_connections(self): self.pool = self.create_pool(3) conn = self.pool.get() conn2 = self.pool.get() curs = conn.cursor() try: curs2 = conn2.cursor() rows2 = curs2.execute("insert into gargleblatz (a) values (%s)" % (314159)) self.assertEqual(rows2, 1) conn2.commit() selection_query = "select * from gargleblatz" rows2 = curs2.execute(selection_query) self.assertEqual(rows2, 1) del curs2 del conn2 # create a new connection, it should see the addition conn3 = self.pool.get() curs3 = conn3.cursor() rows3 = curs3.execute(selection_query) self.assertEqual(rows3, 1) # now, does the already-open connection see it? rows = curs.execute(selection_query) self.assertEqual(rows, 1) finally: # clean up my litter curs.execute("delete from gargleblatz where a=314159") conn.commit() def test_two_simultaneous_connections(self): """ This test is timing-sensitive. """ self.pool = self.create_pool(2) conn = self.pool.get() self.set_up_test_table(conn) self.fill_test_table(conn) curs = conn.cursor() conn2 = self.pool.get() self.set_up_test_table(conn2) self.fill_test_table(conn2) curs2 = conn2.cursor() results = [] LONG_QUERY = "select * from test_table" SHORT_QUERY = "select * from test_table where row_id <= 20" evt = coros.event() def long_running_query(): self.assert_cursor_works(curs) curs.execute(LONG_QUERY) results.append(1) evt.send() evt2 = coros.event() def short_running_query(): self.assert_cursor_works(curs2) curs2.execute(SHORT_QUERY) results.append(2) evt2.send() api.spawn(long_running_query) api.spawn(short_running_query) evt.wait() evt2.wait() results.sort() self.assertEqual([1, 2], results) def test_clear(self): self.pool = self.create_pool() self.pool.put(self.connection) self.pool.clear() self.assertEqual(len(self.pool.free_items), 0) def test_unwrap_connection(self): self.assert_(isinstance(self.connection, db_pool.GenericConnectionWrapper)) conn = self.pool._unwrap_connection(self.connection) self.assert_(not isinstance(conn, db_pool.GenericConnectionWrapper)) self.assertEquals(None, self.pool._unwrap_connection(None)) self.assertEquals(None, self.pool._unwrap_connection(1)) # testing duck typing here -- as long as the connection has a # _base attribute, it should be unwrappable x = Mock() x._base = 'hi' self.assertEquals('hi', self.pool._unwrap_connection(x)) def test_safe_close(self): self.pool._safe_close(self.connection, quiet=True) self.assertEquals(len(self.pool.free_items), 1) self.pool._safe_close(None) self.pool._safe_close(1) # now we're really going for 100% coverage x = Mock() def fail(): raise KeyboardInterrupt() x.close = fail self.assertRaises(KeyboardInterrupt, self.pool._safe_close, x) x = Mock() def fail2(): raise RuntimeError("if this line has been printed, the test succeeded") x.close = fail2 self.pool._safe_close(x, quiet=False) def test_zero_max_idle(self): self.pool = self.create_pool(max_size=2, max_idle=0) self.connection = self.pool.get() self.connection.close() self.assertEquals(len(self.pool.free_items), 0) def test_zero_max_age(self): self.pool = self.create_pool(max_size=2, max_age=0) self.connection = self.pool.get() self.connection.close() self.assertEquals(len(self.pool.free_items), 0) def dont_test_max_idle(self): # This test is timing-sensitive. Rename the function without the "dont" to run it, but beware that it could fail or take a while. self.pool = self.create_pool(max_size=2, max_idle=0.02) self.connection = self.pool.get() self.connection.close() self.assertEquals(len(self.pool.free_items), 1) api.sleep(0.01) # not long enough to trigger the idle timeout self.assertEquals(len(self.pool.free_items), 1) self.connection = self.pool.get() self.connection.close() self.assertEquals(len(self.pool.free_items), 1) api.sleep(0.01) # idle timeout should have fired but done nothing self.assertEquals(len(self.pool.free_items), 1) self.connection = self.pool.get() self.connection.close() self.assertEquals(len(self.pool.free_items), 1) api.sleep(0.03) # long enough to trigger idle timeout for real self.assertEquals(len(self.pool.free_items), 0) def dont_test_max_idle_many(self): # This test is timing-sensitive. Rename the function without the "dont" to run it, but beware that it could fail or take a while. self.pool = self.create_pool(max_size=2, max_idle=0.02) self.connection, conn2 = self.pool.get(), self.pool.get() self.connection.close() api.sleep(0.01) self.assertEquals(len(self.pool.free_items), 1) conn2.close() self.assertEquals(len(self.pool.free_items), 2) api.sleep(0.02) # trigger cleanup of conn1 but not conn2 self.assertEquals(len(self.pool.free_items), 1) def dont_test_max_age(self): # This test is timing-sensitive. Rename the function without the "dont" to run it, but beware that it could fail or take a while. self.pool = self.create_pool(max_size=2, max_age=0.05) self.connection = self.pool.get() self.connection.close() self.assertEquals(len(self.pool.free_items), 1) api.sleep(0.01) # not long enough to trigger the age timeout self.assertEquals(len(self.pool.free_items), 1) self.connection = self.pool.get() self.connection.close() self.assertEquals(len(self.pool.free_items), 1) api.sleep(0.05) # long enough to trigger age timeout self.assertEquals(len(self.pool.free_items), 0) def dont_test_max_age_many(self): # This test is timing-sensitive. Rename the function without the "dont" to run it, but beware that it could fail or take a while. self.pool = self.create_pool(max_size=2, max_age=0.15) self.connection, conn2 = self.pool.get(), self.pool.get() self.connection.close() self.assertEquals(len(self.pool.free_items), 1) api.sleep(0) # not long enough to trigger the age timeout self.assertEquals(len(self.pool.free_items), 1) api.sleep(0.2) # long enough to trigger age timeout self.assertEquals(len(self.pool.free_items), 0) conn2.close() # should not be added to the free items self.assertEquals(len(self.pool.free_items), 0) def test_connection_timeout(self): # use a nonexistent ip address -- this one is reserved by IANA self._auth['host'] = '192.0.2.1' pool = self.create_pool() self.assertRaises(db_pool.ConnectTimeout, pool.get) def test_waiters_get_woken(self): # verify that when there's someone waiting on an empty pool # and someone puts an immediately-closed connection back in # the pool that the waiter gets woken self.pool = self.create_pool(max_size=1, max_age=0) conn = self.pool.get() self.assertEquals(self.pool.free(), 0) self.assertEquals(self.pool.waiting(), 0) e = coros.event() def retrieve(pool, ev): c = pool.get() ev.send(c) api.spawn(retrieve, self.pool, e) api.sleep(0) # these two sleeps should advance the retrieve api.sleep(0) # coroutine until it's waiting in get() self.assertEquals(self.pool.free(), 0) self.assertEquals(self.pool.waiting(), 1) self.pool.put(conn) timer = api.exc_after(0.3, api.TimeoutError) conn = e.wait() timer.cancel() self.assertEquals(self.pool.free(), 0) self.assertEquals(self.pool.waiting(), 0) def dont_test_0_straight_benchmark(self): """ Benchmark; don't run unless you want to wait a while.""" import time iterations = 20000 c = self.connection.cursor() self.connection.commit() def bench(c): for i in xrange(iterations): c.execute('select 1') bench(c) # warm-up results = [] for i in xrange(3): start = time.time() bench(c) end = time.time() results.append(end-start) print "\n%u iterations took an average of %f seconds, (%s) in %s\n" % ( iterations, sum(results)/len(results), results, type(self)) def test_raising_create(self): # if the create() method raises an exception the pool should # not lose any connections self.pool = self.create_pool(max_size=1, module=RaisingDBModule()) self.assertRaises(RuntimeError, self.pool.get) self.assertEquals(self.pool.free(), 1) class RaisingDBModule(object): def connect(self, *args, **kw): raise RuntimeError() class TestTpoolConnectionPool(TestDBConnectionPool): def create_pool(self, max_size = 1, max_idle = 10, max_age = 10, connect_timeout=0.5, module=None): if module is None: module = self._dbmodule return db_pool.TpooledConnectionPool(module, min_size=0, max_size=max_size, max_idle=max_idle, max_age=max_age, connect_timeout = connect_timeout, **self._auth) class TestSaranwrapConnectionPool(TestDBConnectionPool): def create_pool(self, max_size = 1, max_idle = 10, max_age = 10, connect_timeout= 0.5, module=None): if module is None: module = self._dbmodule return db_pool.SaranwrappedConnectionPool(module, min_size=0, max_size=max_size, max_idle=max_idle, max_age=max_age, connect_timeout=connect_timeout, **self._auth) def test_raising_create(self): # *TODO: this fails because of saranwrap's unwillingness to # wrap objects in tests, but it should be fixable pass class TestRawConnectionPool(TestDBConnectionPool): def create_pool(self, max_size = 1, max_idle = 10, max_age = 10, connect_timeout= 0.5, module=None): if module is None: module = self._dbmodule return db_pool.RawConnectionPool(module, min_size=0, max_size=max_size, max_idle=max_idle, max_age=max_age, connect_timeout=connect_timeout, **self._auth) def test_connection_timeout(self): pass # not gonna work for raw connections because they're not nonblocking class TestMysqlConnectionPool(object): def setUp(self): import MySQLdb self._dbmodule = MySQLdb try: import simplejson import os.path auth_utf8 = simplejson.load(open(os.path.join(os.path.dirname(__file__), 'auth.json'))) # have to convert unicode objects to str objects because mysqldb is dum self._auth = dict([(str(k), str(v)) for k, v in auth_utf8.items()]) except (IOError, ImportError), e: self._auth = {'host': 'localhost','user': 'root','passwd': '','db': 'persist0'} super(TestMysqlConnectionPool, self).setUp() def tearDown(self): pass def create_db(self): auth = self._auth.copy() try: self.drop_db() except Exception: pass dbname = auth.pop('db') db = self._dbmodule.connect(**auth).cursor() db.execute("create database "+dbname) db.close() del db def drop_db(self): db = self._dbmodule.connect(**self._auth).cursor() db.execute("drop database "+self._auth['db']) db.close() del db # for some reason the tpool test hangs if run after the saranwrap test class Test01MysqlTpool(TestMysqlConnectionPool, TestTpoolConnectionPool, TestCase): pass class Test02MysqlSaranwrap(TestMysqlConnectionPool, TestSaranwrapConnectionPool, TestCase): pass class Test03MysqlRaw(TestMysqlConnectionPool, TestRawConnectionPool, TestCase): pass if __name__ == '__main__': try: import MySQLdb except ImportError: print "Unable to import MySQLdb, skipping db_pool_test." else: main() else: import MySQLdb diff --git a/greentest/generate_report.py b/greentest/generate_report.py index 926dd72..94b38f4 100755 --- a/greentest/generate_report.py +++ b/greentest/generate_report.py @@ -1,235 +1,235 @@ #!/usr/bin/python2 # Copyright (c) 2008-2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import sys import os import sqlite3 import glob -REPO_URL = 'http://bitbucket.org/denis/eventlet' +REPO_URL = 'http://bitbucket.org/denis/eventlib' hubs_order = ['poll', 'selects', 'libevent', 'libev', 'twistedr/selectreactor', 'twistedr/pollreactor', 'twistedr/epollreactor'] def make_table(database): c = sqlite3.connect(database) res = c.execute(('select command_record.id, testname, hub, runs, errors, fails, ' 'timeouts, exitcode, stdout from parsed_command_record join ' 'command_record on parsed_command_record.id=command_record.id ')).fetchall() table = {} # testname -> hub -> test_result (runs, errors, fails, timeouts) tests = set() for id, testname, hub, runs, errors, fails, timeouts, exitcode, stdout in res: tests.add(testname) test_result = TestResult(runs, errors, fails, timeouts, exitcode, id, stdout) table.setdefault(testname, {})[hub] = test_result return table, sorted(tests) def calc_hub_stats(table): hub_stats = {} # hub -> cumulative test_result for testname in table: for hub in table[testname]: test_result = table[testname][hub] hub_stats.setdefault(hub, TestResult(0,0,0,0)).__iadd__(test_result) hubs = hub_stats.items() hub_names = sorted(hub_stats.keys()) def get_order(hub): try: return hubs_order.index(hub) except ValueError: return 100 + hub_names.index(hub) hubs.sort(key=lambda (hub, stats): get_order(hub)) return hub_stats, [x[0] for x in hubs] class TestResult: def __init__(self, runs, errors, fails, timeouts, exitcode=None, id=None, output=None): self.runs = max(runs, 0) self.errors = max(errors, 0) self.fails = max(fails, 0) self.timeouts = max(timeouts, 0) self.exitcode = exitcode self.id = id self.output = output @property def passed(self): return max(0, self.runs - self.errors - self.fails) @property def failed(self): return self.errors + self.fails @property def total(self): return self.runs + self.timeouts @property def percentage(self): return float(self.passed) / self.total def __iadd__(self, other): self.runs += other.runs self.errors += other.errors self.fails += other.fails self.timeouts += other.timeouts if self.exitcode != other.exitcode: self.exitcode = None self.id = None self.output = None def color(self): if self.id is None: return 'white' if self.timeouts or self.exitcode in [7, 9, 10]: return 'red' elif self.errors or self.fails or self.exitcode: return 'yellow' else: return '"#72ff75"' def warnings(self): r = [] if not self.failed and not self.timeouts: if self.exitcode in [7, 9, 10]: r += ['TIMEOUT'] if self.exitcode: r += ['exitcode=%s' % self.exitcode] if self.output is not None: output = self.output.lower() warning = output.count('warning') if warning: r += ['%s warnings' % warning] tracebacks = output.count('traceback') if tracebacks: r += ['%s tracebacks' % tracebacks] return r def text(self): errors = [] if self.fails: errors += ['%s failed' % self.fails] if self.errors: errors += ['%s raised' % self.errors] if self.timeouts: errors += ['%s timeout' % self.timeouts] errors += self.warnings() if self.id is None: errors += ['
%s total' % self.total] return '\n'.join(["%s passed" % self.passed] + errors).replace(' ', ' ') # shorter passed/failed/raised/timeout def text_short(self): r = '%s/%s/%s' % (self.passed, self.failed, self.timeouts) if self.warnings(): r += '\n' + '\n'.join(self.warnings()).replace(' ', ' ') return r def format(self): text = self.text().replace('\n', '
\n') if self.id is None: valign = 'bottom' else: text = '%s' % (self.id, text) valign = 'center' return '%s' % (valign, self.color(), text) def format_testname(changeset, test): return '%s' % (REPO_URL, changeset, test, test) def format_table(table, hubs, tests, hub_stats, changeset): r = '\n\n\n' % hub r += '\n' r += '' for hub in hubs: test_result = hub_stats.get(hub) if test_result is None: r += '' else: r += test_result.format() + '\n' r += '' r += '' % (len(hubs)+1) for test in tests: r += '' % format_testname(changeset, test) for hub in hubs: test_result = table[test].get(hub) if test_result is None: r += '' else: r += test_result.format() + '\n' r += '' r += '
\n' for hub in hubs: r += '%s
Totalno data
%sno data
' return r def format_header(rev, changeset, pyversion): result = '
' if REPO_URL is None: result += 'Eventlet changeset %s: %s' % (rev, changeset) else: url = '%s/changeset/%s' % (REPO_URL, changeset) result += 'Eventlet changeset %s: %s' % (url, rev, changeset) result += '
Python version: %s

' % pyversion return result def format_html(table, rev, changeset, pyversion): r = '' r += format_header(rev, changeset, pyversion) r += table r += '' return r def generate_raw_results(path, database): c = sqlite3.connect(database) res = c.execute('select id, stdout from command_record').fetchall() for id, out in res: file(os.path.join(path, '%s.txt' % id), 'w').write(out.encode('utf-8')) sys.stderr.write('.') sys.stderr.write('\n') def main(db): full_changeset = '.'.join(db.split('.')[1:-1]) rev, changeset, pyversion = full_changeset.split('_') table, tests = make_table(db) hub_stats, hubs = calc_hub_stats(table) report = format_html(format_table(table, hubs, tests, hub_stats, changeset), rev, changeset, pyversion) path = '../htmlreports/%s' % full_changeset try: os.makedirs(path) except OSError, ex: if 'File exists' not in str(ex): raise file(path + '/index.html', 'w').write(report) generate_raw_results(path, db) if __name__=='__main__': if not sys.argv[1:]: latest_db = sorted(glob.glob('results.*.db'), key=lambda f: os.stat(f).st_mtime)[-1] print latest_db sys.argv.append(latest_db) for db in sys.argv[1:]: main(db) diff --git a/greentest/greenio_test.py b/greentest/greenio_test.py index 680416c..86fc508 100644 --- a/greentest/greenio_test.py +++ b/greentest/greenio_test.py @@ -1,101 +1,101 @@ # Copyright (c) 2006-2007, Linden Research, Inc. # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. from unittest import TestCase, main -from eventlet import api +from eventlib import api import socket # TODO try and reuse unit tests from within Python itself class TestGreenIo(TestCase): def test_close_with_makefile(self): def accept_close_early(listener): # verify that the makefile and the socket are truly independent # by closing the socket prior to using the made file try: conn, addr = listener.accept() fd = conn.makeGreenFile() conn.close() fd.write('hello\n') fd.close() self.assertRaises(socket.error, fd.write, 'a') self.assertRaises(socket.error, conn.send, 'b') finally: listener.close() def accept_close_late(listener): # verify that the makefile and the socket are truly independent # by closing the made file and then sending a character try: conn, addr = listener.accept() fd = conn.makeGreenFile() fd.write('hello') fd.close() conn.send('\n') conn.close() self.assertRaises(socket.error, fd.write, 'a') self.assertRaises(socket.error, conn.send, 'b') finally: listener.close() def did_it_work(server): client = api.connect_tcp(('127.0.0.1', server.getsockname()[1])) fd = client.makeGreenFile() client.close() assert fd.readline() == 'hello\n' assert fd.read() == '' fd.close() server = api.tcp_listener(('0.0.0.0', 0)) killer = api.spawn(accept_close_early, server) did_it_work(server) api.kill(killer) server = api.tcp_listener(('0.0.0.0', 0)) killer = api.spawn(accept_close_late, server) did_it_work(server) api.kill(killer) def test_del_closes_socket(self): timer = api.exc_after(0.5, api.TimeoutError) def accept_once(listener): # delete/overwrite the original conn # object, only keeping the file object around # closing the file object should close everything try: conn, addr = listener.accept() conn = conn.makeGreenFile() conn.write('hello\n') conn.close() self.assertRaises(socket.error, conn.write, 'a') finally: listener.close() server = api.tcp_listener(('0.0.0.0', 0)) killer = api.spawn(accept_once, server) client = api.connect_tcp(('127.0.0.1', server.getsockname()[1])) fd = client.makeGreenFile() client.close() assert fd.read() == 'hello\n' assert fd.read() == '' timer.cancel() if __name__ == '__main__': main() diff --git a/greentest/httpc_test.py b/greentest/httpc_test.py index 4c7a157..927ce89 100644 --- a/greentest/httpc_test.py +++ b/greentest/httpc_test.py @@ -1,461 +1,461 @@ # @author Bryan O'Sullivan # # Copyright (c) 2007, Linden Research, Inc. # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import cgi from unittest import TestCase, main -from eventlet import api -from eventlet import httpc -from eventlet import wsgi +from eventlib import api +from eventlib import httpc +from eventlib import wsgi import time try: from cStringIO import StringIO except ImportError: from StringIO import StringIO class Site(object): def __init__(self): self.stuff = {'hello': 'hello world'} def __call__(self, env, start_response): return getattr(self, 'handle_%s' % env['REQUEST_METHOD'].lower())(env, start_response) def _get_query_pairs(env): parsed = cgi.parse_qs(env['QUERY_STRING']) for key, values in parsed.items(): for val in values: yield key, val def get_query_pairs(env): return list(_get_query_pairs(env)) class BasicSite(Site): def handle_get(self, env, start_response): headers = [('x-get', 'hello'), ('Content-type', 'text/plain')] resp = StringIO() path = env['PATH_INFO'].lstrip('/') try: resp.write(self.stuff[path]) except KeyError: start_response("404 Not Found", headers) return ["Not Found"] for k,v in get_query_pairs(env): resp.write(k + '=' + v + '\n') start_response("200 OK", headers) return [resp.getvalue()] def handle_head(self, env, start_response): headers = [('x-head', 'hello'), ('Content-type', 'text/plain')] start_response("200 OK", headers) return [""] def handle_put(self, env, start_response): headers = [('x-put', 'hello'), ('Content-type', 'text/plain')] path = env['PATH_INFO'].lstrip('/') if not path: start_response("400 Bad Request", headers) return [""] if path in self.stuff: start_response("204 No Content", headers) else: start_response("201 Created", headers) self.stuff[path] = env['wsgi.input'].read(int(env.get('CONTENT_LENGTH', '0'))) return [""] def handle_delete(self, env, start_response): headers = [('x-delete', 'hello'), ('Content-type', 'text/plain')] path = env['PATH_INFO'].lstrip('/') if not path: start_response("400 Bad Request", headers) return [""] try: del self.stuff[path] start_response("204 No Content", headers) except KeyError: start_response("404 Not Found", headers) return [""] def handle_post(self, env, start_response): headers = [('x-post', 'hello'), ('Content-type', 'text/plain')] start_response("200 OK", headers) return [env['wsgi.input'].read(int(env.get('CONTENT_LENGTH', '0')))] class TestBase(object): site_class = BasicSite def base_url(self): return 'http://localhost:31337/' def setUp(self): self.logfile = StringIO() self.victim = api.spawn(wsgi.server, api.tcp_listener(('0.0.0.0', 31337)), self.site_class(), log=self.logfile, max_size=128) def tearDown(self): api.kill(self.victim) class TestHttpc(TestBase, TestCase): def test_get_bad_uri(self): self.assertRaises(httpc.NotFound, lambda: httpc.get(self.base_url() + 'b0gu5')) def test_get(self): response = httpc.get(self.base_url() + 'hello') self.assertEquals(response, 'hello world') def test_get_(self): status, msg, body = httpc.get_(self.base_url() + 'hello') self.assertEquals(status, 200) self.assertEquals(msg.dict['x-get'], 'hello') self.assertEquals(body, 'hello world') def test_get_query(self): response = httpc.get(self.base_url() + 'hello?foo=bar&foo=quux') self.assertEquals(response, 'hello worldfoo=bar\nfoo=quux\n') def test_head_(self): status, msg, body = httpc.head_(self.base_url() + 'hello') self.assertEquals(status, 200) self.assertEquals(msg.dict['x-head'], 'hello') self.assertEquals(body, '') def test_head(self): self.assertEquals(httpc.head(self.base_url() + 'hello'), '') def test_post_(self): data = 'qunge' status, msg, body = httpc.post_(self.base_url() + '', data=data) self.assertEquals(status, 200) self.assertEquals(msg.dict['x-post'], 'hello') self.assertEquals(body, data) def test_post(self): data = 'qunge' self.assertEquals(httpc.post(self.base_url() + '', data=data), data) def test_put_bad_uri(self): self.assertRaises( httpc.BadRequest, lambda: httpc.put(self.base_url() + '', data='')) def test_put_empty(self): httpc.put(self.base_url() + 'empty', data='') self.assertEquals(httpc.get(self.base_url() + 'empty'), '') def test_put_nonempty(self): data = 'nonempty' httpc.put(self.base_url() + 'nonempty', data=data) self.assertEquals(httpc.get(self.base_url() + 'nonempty'), data) def test_put_01_create(self): data = 'goodbye world' status, msg, body = httpc.put_(self.base_url() + 'goodbye', data=data) self.assertEquals(status, 201) self.assertEquals(msg.dict['x-put'], 'hello') self.assertEquals(body, '') self.assertEquals(httpc.get(self.base_url() + 'goodbye'), data) def test_put_02_modify(self): self.test_put_01_create() data = 'i really mean goodbye' status = httpc.put_(self.base_url() + 'goodbye', data=data)[0] self.assertEquals(status, 204) self.assertEquals(httpc.get(self.base_url() + 'goodbye'), data) def test_delete_(self): httpc.put(self.base_url() + 'killme', data='killme') status, msg, body = httpc.delete_(self.base_url() + 'killme') self.assertEquals(status, 204) self.assertRaises( httpc.NotFound, lambda: httpc.get(self.base_url() + 'killme')) def test_delete(self): httpc.put(self.base_url() + 'killme', data='killme') self.assertEquals(httpc.delete(self.base_url() + 'killme'), '') self.assertRaises( httpc.NotFound, lambda: httpc.get(self.base_url() + 'killme')) def test_delete_bad_uri(self): self.assertRaises( httpc.NotFound, lambda: httpc.delete(self.base_url() + 'b0gu5')) class RedirectSite(BasicSite): response_code = "301 Moved Permanently" def __call__(self, env, start_response): path = env['PATH_INFO'] if path.startswith('/redirect/'): url = 'http://' + env['HTTP_HOST'] + path.replace('/redirect/', '/') start_response(self.response_code, [("Location", url)]) return [""] return super(RedirectSite, self).__call__(env, start_response) class Site301(RedirectSite): pass class Site302(BasicSite): def __call__(self, env, start_response): path = env['PATH_INFO'] if path.startswith('/expired/'): url = 'http://' + env['HTTP_HOST'] + path.replace('/expired/', '/') headers = [('location', url), ('expires', '0')] start_response("302 Found", headers) return [""] if path.startswith('/expires/'): url = 'http://' + env['HTTP_HOST'] + path.replace('/expires/', '/') expires = time.time() + (100 * 24 * 60 * 60) headers = [('location', url), ('expires', httpc.to_http_time(expires))] start_response("302 Found", headers) return [""] return super(Site302, self).__call__(env, start_response) class Site303(RedirectSite): response_code = "303 See Other" class Site307(RedirectSite): response_code = "307 Temporary Redirect" class TestHttpc301(TestBase, TestCase): site_class = Site301 def base_url(self): return 'http://localhost:31337/redirect/' def test_get(self): try: httpc.get(self.base_url() + 'hello', max_retries=0) self.assert_(False) except httpc.MovedPermanently, err: response = err.retry() self.assertEquals(response, 'hello world') self.assertEquals(httpc.get(self.base_url() + 'hello', max_retries=1), 'hello world') def test_post(self): data = 'qunge' try: response = httpc.post(self.base_url() + '', data=data) self.assert_(False) except httpc.MovedPermanently, err: response = err.retry() self.assertEquals(response, data) class TestHttpc302(TestBase, TestCase): site_class = Site302 def test_get_expired(self): try: httpc.get(self.base_url() + 'expired/hello', max_retries=0) self.assert_(False) except httpc.Found, err: response = err.retry() self.assertEquals(response, 'hello world') self.assertEquals(httpc.get(self.base_url() + 'expired/hello', max_retries=1), 'hello world') def test_get_expires(self): try: httpc.get(self.base_url() + 'expires/hello', max_retries=0) self.assert_(False) except httpc.Found, err: response = err.retry() self.assertEquals(response, 'hello world') self.assertEquals(httpc.get(self.base_url() + 'expires/hello', max_retries=1), 'hello world') class TestHttpc303(TestBase, TestCase): site_class = Site303 def base_url(self): return 'http://localhost:31337/redirect/' def test_post(self): data = 'hello world' try: response = httpc.post(self.base_url() + 'hello', data=data) self.assert_(False) except httpc.SeeOther, err: response = err.retry() self.assertEquals(response, data) class TestHttpc307(TestBase, TestCase): site_class = Site307 def base_url(self): return 'http://localhost:31337/redirect/' def test_post(self): data = 'hello world' try: response = httpc.post(self.base_url() + 'hello', data=data) self.assert_(False) except httpc.TemporaryRedirect, err: response = err.retry() self.assertEquals(response, data) class Site500(BasicSite): def __call__(self, env, start_response): start_response("500 Internal Server Error", [("Content-type", "text/plain")]) return ["screw you world"] class TestHttpc500(TestBase, TestCase): site_class = Site500 def base_url(self): return 'http://localhost:31337/' def test_get(self): data = 'screw you world' try: response = httpc.get(self.base_url()) self.fail() except httpc.InternalServerError, e: self.assertEquals(e.params.response_body, data) self.assert_(str(e).count(data)) self.assert_(repr(e).count(data)) class Site504(BasicSite): def __call__(self, env, start_response): start_response("504 Gateway Timeout", [("Content-type", "text/plain")]) return ["screw you world"] class TestHttpc504(TestBase, TestCase): site_class = Site504 def base_url(self): return 'http://localhost:31337/' def test_post(self): # Simply ensure that a 504 status code results in a # GatewayTimeout. Don't bother retrying. data = 'hello world' self.assertRaises(httpc.GatewayTimeout, lambda: httpc.post(self.base_url(), data=data)) class TestHttpTime(TestCase): rfc1123_time = 'Sun, 06 Nov 1994 08:49:37 GMT' rfc850_time = 'Sunday, 06-Nov-94 08:49:37 GMT' asctime_time = 'Sun Nov 6 08:49:37 1994' secs_since_epoch = 784111777 def test_to_http_time(self): self.assertEqual(self.rfc1123_time, httpc.to_http_time(self.secs_since_epoch)) def test_from_http_time(self): for formatted in (self.rfc1123_time, self.rfc850_time, self.asctime_time): ticks = httpc.from_http_time(formatted, 0) self.assertEqual(ticks, self.secs_since_epoch) class TestProxy(TestCase): def test_ssl_proxy(self): def ssl_proxy(sock): conn, addr = sock.accept() fd = conn.makefile() try: line = request = fd.readline() self.assertEqual(request, 'GET https://localhost:1234 HTTP/1.1\r\n') while line.strip(): # eat request headers line = fd.readline() # we're not going to actually proxy to localhost:1234, # we're just going to return a response on its behalf fd.write("HTTP/1.0 200 OK\r\n\r\n") finally: fd.close() conn.close() server = api.tcp_listener(('0.0.0.0', 5505)) api.spawn(ssl_proxy, server) import os os.environ['ALL_PROXY'] = 'localhost:5505' httpc.get('https://localhost:1234', ok=[200], use_proxy=True) def test_ssl_proxy_redirects(self): # make sure that if the proxy returns a redirect, that httpc # successfully follows it (this was broken at one point) def ssl_proxy(sock): conn, addr = sock.accept() fd = conn.makefile() try: line = request = fd.readline() self.assertEqual(request, 'GET https://localhost:1234 HTTP/1.1\r\n') while line.strip(): # eat request headers line = fd.readline() # we're not going to actually proxy to localhost:1234, # we're just going to return a response on its behalf fd.write("HTTP/1.0 302 Found\r\nLocation: https://localhost:1234/2\r\n\r\n") finally: fd.close() conn.close() # second request, for /2 target conn, addr = sock.accept() fd = conn.makefile() try: line = request = fd.readline() self.assertEqual(request, 'GET https://localhost:1234/2 HTTP/1.1\r\n') while line.strip(): # eat request headers line = fd.readline() fd.write("HTTP/1.0 200 OK\r\n\r\n") finally: fd.close() conn.close() sock.close() server = api.tcp_listener(('0.0.0.0', 5505)) api.spawn(ssl_proxy, server) import os os.environ['ALL_PROXY'] = 'localhost:5505' httpc.get('https://localhost:1234', use_proxy=True, max_retries=1) if __name__ == '__main__': main() diff --git a/greentest/httpd_test.py b/greentest/httpd_test.py index f34acfc..828df5c 100644 --- a/greentest/httpd_test.py +++ b/greentest/httpd_test.py @@ -1,210 +1,210 @@ # @author Donovan Preston # # Copyright (c) 2007, Linden Research, Inc. # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. from unittest import TestCase, main -from eventlet import api -from eventlet import httpd -from eventlet import processes -from eventlet import util +from eventlib import api +from eventlib import httpd +from eventlib import processes +from eventlib import util from greentest import find_command try: from cStringIO import StringIO except ImportError: from StringIO import StringIO util.wrap_socket_with_coroutine_socket() class Site(object): def handle_request(self, req): path = req.path_segments() if len(path) > 0 and path[0] == "notexist": req.response(404, body='not found') return req.write('hello world') def adapt(self, obj, req): req.write(str(obj)) CONTENT_LENGTH = 'content-length' """ HTTP/1.1 200 OK Date: foo Content-length: 11 hello world """ class ConnectionClosed(Exception): pass def read_http(sock): fd = sock.makeGreenFile() response_line = fd.readline() if not response_line: raise ConnectionClosed raw_headers = fd.readuntil('\r\n\r\n').strip() #print "R", response_line, raw_headers headers = dict() for x in raw_headers.split('\r\n'): #print "X", x key, value = x.split(': ', 1) headers[key.lower()] = value if CONTENT_LENGTH in headers: num = int(headers[CONTENT_LENGTH]) body = fd.read(num) #print body else: body = None return response_line, headers, body class TestHttpd(TestCase): mode = 'static' def setUp(self): self.logfile = StringIO() self.site = Site() self.killer = api.spawn( httpd.server, api.tcp_listener(('0.0.0.0', 12346)), self.site, max_size=128, log=self.logfile) def tearDown(self): api.kill(self.killer) def test_001_server(self): sock = api.connect_tcp( ('127.0.0.1', 12346)) fd = sock.makeGreenFile() fd.write('GET / HTTP/1.0\r\nHost: localhost\r\n\r\n') result = fd.read() fd.close() ## The server responds with the maximum version it supports self.assert_(result.startswith('HTTP'), result) self.assert_(result.endswith('hello world')) def test_002_keepalive(self): sock = api.connect_tcp( ('127.0.0.1', 12346)) fd = sock.makeGreenFile() fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') read_http(sock) fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') read_http(sock) fd.close() def test_003_passing_non_int_to_read(self): # This should go in greenio_test sock = api.connect_tcp( ('127.0.0.1', 12346)) fd = sock.makeGreenFile() fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') cancel = api.exc_after(1, RuntimeError) self.assertRaises(TypeError, fd.read, "This shouldn't work") cancel.cancel() fd.close() def test_004_close_keepalive(self): sock = api.connect_tcp( ('127.0.0.1', 12346)) fd = sock.makeGreenFile() fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') read_http(sock) fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n') read_http(sock) fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') self.assertRaises(ConnectionClosed, read_http, sock) fd.close() def skip_test_005_run_apachebench(self): url = 'http://localhost:12346/' # ab is apachebench out = processes.Process(find_command('ab'), ['-c','64','-n','1024', '-k', url]) print out.read() def test_006_reject_long_urls(self): sock = api.connect_tcp( ('127.0.0.1', 12346)) path_parts = [] for ii in range(3000): path_parts.append('path') path = '/'.join(path_parts) request = 'GET /%s HTTP/1.0\r\nHost: localhost\r\n\r\n' % path fd = sock.makeGreenFile() fd.write(request) result = fd.readline() status = result.split(' ')[1] self.assertEqual(status, '414') fd.close() def test_007_get_arg(self): # define a new handler that does a get_arg as well as a read_body def new_handle_request(req): a = req.get_arg('a') body = req.read_body() req.write('a is %s, body is %s' % (a, body)) self.site.handle_request = new_handle_request sock = api.connect_tcp( ('127.0.0.1', 12346)) request = '\r\n'.join(( 'POST /%s HTTP/1.0', 'Host: localhost', 'Content-Length: 3', '', 'a=a')) fd = sock.makeGreenFile() fd.write(request) # send some junk after the actual request fd.write('01234567890123456789') reqline, headers, body = read_http(sock) self.assertEqual(body, 'a is a, body is a=a') fd.close() def test_008_correctresponse(self): sock = api.connect_tcp( ('127.0.0.1', 12346)) fd = sock.makeGreenFile() fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') response_line_200,_,_ = read_http(sock) fd.write('GET /notexist HTTP/1.1\r\nHost: localhost\r\n\r\n') response_line_404,_,_ = read_http(sock) fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') response_line_test,_,_ = read_http(sock) self.assertEqual(response_line_200,response_line_test) fd.close() if __name__ == '__main__': main() diff --git a/greentest/pools_test.py b/greentest/pools_test.py index c585412..ed1f8f2 100644 --- a/greentest/pools_test.py +++ b/greentest/pools_test.py @@ -1,238 +1,238 @@ # @author Donovan Preston, Aaron Brashears # # Copyright (c) 2006-2007, Linden Research, Inc. # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. from unittest import TestCase, main -from eventlet import api -from eventlet import coros -from eventlet import pools +from eventlib import api +from eventlib import coros +from eventlib import pools class IntPool(pools.Pool): def create(self): self.current_integer = getattr(self, 'current_integer', 0) + 1 return self.current_integer class TestIntPool(TestCase): mode = 'static' def setUp(self): self.pool = IntPool(min_size=0, max_size=4) def test_integers(self): # Do not actually use this pattern in your code. The pool will be # exhausted, and unrestoreable. # If you do a get, you should ALWAYS do a put, probably like this: # try: # thing = self.pool.get() # # do stuff # finally: # self.pool.put(thing) # with self.pool.some_api_name() as thing: # # do stuff self.assertEquals(self.pool.get(), 1) self.assertEquals(self.pool.get(), 2) self.assertEquals(self.pool.get(), 3) self.assertEquals(self.pool.get(), 4) def test_free(self): self.assertEquals(self.pool.free(), 4) gotten = self.pool.get() self.assertEquals(self.pool.free(), 3) self.pool.put(gotten) self.assertEquals(self.pool.free(), 4) def test_exhaustion(self): waiter = coros.queue(0) def consumer(): gotten = None try: gotten = self.pool.get() finally: waiter.send(gotten) api.spawn(consumer) one, two, three, four = ( self.pool.get(), self.pool.get(), self.pool.get(), self.pool.get()) self.assertEquals(self.pool.free(), 0) # Let consumer run; nothing will be in the pool, so he will wait api.sleep(0) # Wake consumer self.pool.put(one) # wait for the consumer self.assertEquals(waiter.wait(), one) def test_blocks_on_pool(self): waiter = coros.queue(0) def greedy(): self.pool.get() self.pool.get() self.pool.get() self.pool.get() # No one should be waiting yet. self.assertEquals(self.pool.waiting(), 0) # The call to the next get will unschedule this routine. self.pool.get() # So this send should never be called. waiter.send('Failed!') killable = api.spawn(greedy) # no one should be waiting yet. self.assertEquals(self.pool.waiting(), 0) ## Wait for greedy api.sleep(0) ## Greedy should be blocking on the last get self.assertEquals(self.pool.waiting(), 1) ## Send will never be called, so balance should be 0. self.assertFalse(waiter.ready()) api.kill(killable) def test_ordering(self): # normal case is that items come back out in the # same order they are put one, two = self.pool.get(), self.pool.get() self.pool.put(one) self.pool.put(two) self.assertEquals(self.pool.get(), one) self.assertEquals(self.pool.get(), two) def test_putting_to_queue(self): timer = api.exc_after(0.1, api.TimeoutError) size = 2 self.pool = IntPool(min_size=0, max_size=size) queue = coros.queue() results = [] def just_put(pool_item, index): self.pool.put(pool_item) queue.send(index) for index in xrange(size + 1): pool_item = self.pool.get() api.spawn(just_put, pool_item, index) while results != range(size + 1): x = queue.wait() results.append(x) timer.cancel() class TestAbstract(TestCase): mode = 'static' def test_abstract(self): ## Going for 100% coverage here ## A Pool cannot be used without overriding create() pool = pools.Pool() self.assertRaises(NotImplementedError, pool.get) class TestIntPool2(TestCase): mode = 'static' def setUp(self): self.pool = IntPool(min_size=3, max_size=3) def test_something(self): self.assertEquals(len(self.pool.free_items), 3) ## Cover the clause in get where we get from the free list instead of creating ## an item on get gotten = self.pool.get() self.assertEquals(gotten, 1) class TestOrderAsStack(TestCase): mode = 'static' def setUp(self): self.pool = IntPool(max_size=3, order_as_stack=True) def test_ordering(self): # items come out in the reverse order they are put one, two = self.pool.get(), self.pool.get() self.pool.put(one) self.pool.put(two) self.assertEquals(self.pool.get(), two) self.assertEquals(self.pool.get(), one) class RaisePool(pools.Pool): def create(self): raise RuntimeError() class TestCreateRaises(TestCase): mode = 'static' def setUp(self): self.pool = RaisePool(max_size=3) def test_it(self): self.assertEquals(self.pool.free(), 3) self.assertRaises(RuntimeError, self.pool.get) self.assertEquals(self.pool.free(), 3) ALWAYS = RuntimeError('I always fail') SOMETIMES = RuntimeError('I fail half the time') class TestTookTooLong(Exception): pass class TestFan(TestCase): mode = 'static' def setUp(self): self.timer = api.exc_after(1, TestTookTooLong()) self.pool = IntPool(max_size=2) def tearDown(self): self.timer.cancel() def test_with_list(self): list_of_input = ['agent-one', 'agent-two', 'agent-three'] def my_callable(pool_item, next_thing): ## Do some "blocking" (yielding) thing api.sleep(0.01) return next_thing output = self.pool.fan(my_callable, list_of_input) self.assertEquals(list_of_input, output) def test_all_fail(self): def my_failure(pool_item, next_thing): raise ALWAYS self.assertRaises(pools.AllFailed, self.pool.fan, my_failure, range(4)) def test_some_fail(self): def my_failing_callable(pool_item, next_thing): if next_thing % 2: raise SOMETIMES return next_thing self.assertRaises(pools.SomeFailed, self.pool.fan, my_failing_callable, range(4)) if __name__ == '__main__': main() diff --git a/greentest/processes_test.py b/greentest/processes_test.py index a417193..906034d 100644 --- a/greentest/processes_test.py +++ b/greentest/processes_test.py @@ -1,134 +1,134 @@ # @author Donovan Preston, Aaron Brashears # # Copyright (c) 2006-2007, Linden Research, Inc. # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import sys from unittest import TestCase, main -from eventlet import processes +from eventlib import processes class TestEchoPool(TestCase): def setUp(self): self.pool = processes.ProcessPool('echo', ["hello"]) def test_echo(self): result = None proc = self.pool.get() try: result = proc.read() finally: self.pool.put(proc) self.assertEquals(result, 'hello\n') def test_read_eof(self): proc = self.pool.get() try: proc.read() self.assertRaises(processes.DeadProcess, proc.read) finally: self.pool.put(proc) class TestCatPool(TestCase): def setUp(self): self.pool = processes.ProcessPool('cat') def test_cat(self): result = None proc = self.pool.get() try: proc.write('goodbye') proc.close_stdin() result = proc.read() finally: self.pool.put(proc) self.assertEquals(result, 'goodbye') def test_write_to_dead(self): result = None proc = self.pool.get() try: proc.write('goodbye') proc.close_stdin() result = proc.read() self.assertRaises(processes.DeadProcess, proc.write, 'foo') finally: self.pool.put(proc) def test_close(self): result = None proc = self.pool.get() try: proc.write('hello') proc.close() self.assertRaises(processes.DeadProcess, proc.write, 'goodbye') finally: self.pool.put(proc) class TestDyingProcessesLeavePool(TestCase): def setUp(self): self.pool = processes.ProcessPool('echo', ['hello'], max_size=1) def test_dead_process_not_inserted_into_pool(self): proc = self.pool.get() try: try: result = proc.read() self.assertEquals(result, 'hello\n') result = proc.read() except processes.DeadProcess: pass finally: self.pool.put(proc) proc2 = self.pool.get() self.assert_(proc is not proc2) class TestProcessLivesForever(TestCase): def setUp(self): self.pool = processes.ProcessPool(sys.executable, ['-c', 'print "y"; import time; time.sleep(0.4); print "y"'], max_size=1) def test_reading_twice_from_same_process(self): # this test is a little timing-sensitive in that if the sub-process # completes its sleep before we do a full put/get then it will fail proc = self.pool.get() try: result = proc.read(2) self.assertEquals(result, 'y\n') finally: self.pool.put(proc) proc2 = self.pool.get() self.assert_(proc is proc2, "This will fail if there is a timing issue") try: result = proc2.read(2) self.assertEquals(result, 'y\n') finally: self.pool.put(proc2) if __name__ == '__main__': main() diff --git a/greentest/runall.py b/greentest/runall.py index 34a7b10..0b5fd2e 100755 --- a/greentest/runall.py +++ b/greentest/runall.py @@ -1,172 +1,172 @@ #!/usr/bin/python2 # Copyright (c) 2008-2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. """Run tests for different configurations (hub/reactor)""" import sys import os import random from glob import glob from optparse import OptionParser, Option from copy import copy from time import time from with_eventlet import import_reactor first_hubs = ['poll', 'selects', 'twistedr'] first_reactors = ['selectreactor', 'pollreactor'] COMMAND = sys.executable + ' ./record_results.py ' + sys.executable + ' ./with_timeout.py ./with_eventlet.py %(setup)s %(test)s' PARSE_PERIOD = 10 # the following aren't in the default list unless --all option present NOT_HUBS = set() NOT_REACTORS = set(['wxreactor', 'glib2reactor', 'gtk2reactor']) NOT_TESTS = set(['db_pool_test.py']) def w(s): sys.stderr.write("%s\n" % (s, )) def enum_hubs(): - from eventlet.api import use_hub - hubs = glob('../eventlet/hubs/*.py') + from eventlib.api import use_hub + hubs = glob('../eventlib/hubs/*.py') hubs = [os.path.basename(h)[:-3] for h in hubs] hubs = [h for h in hubs if h[:1]!='_'] hubs = set(hubs) hubs.discard('hub') hubs -= NOT_HUBS result = [] for hub in hubs: try: use_hub(hub) except Exception, ex: print 'Skipping hub %s: %s' % (hub, ex) else: result.append(hub) return result def enum_reactors(): try: import twisted except ImportError: return [] p = os.path.join(os.path.dirname(twisted.__file__), 'internet', '*?reactor.py') files = glob(p) all_reactors = [os.path.basename(f[:-3]) for f in files] all_reactors = set(all_reactors) - NOT_REACTORS selected_reactors = [] for reactor in all_reactors: try: import_reactor(reactor) except Exception, ex: print 'Skipping reactor %s: %s' % (reactor, ex) else: selected_reactors.append(reactor) return selected_reactors def enum_tests(): tests = [] tests += glob('test_*.py') tests += glob('*_test.py') tests = set(tests) - NOT_TESTS - set(['test_support.py']) return tests def cmd(program): w(program) res = os.system(program)>>8 w(res) if res==1: sys.exit(1) return res def check_stringlist(option, opt, value): return value.split(',') class MyOption(Option): TYPES = Option.TYPES + ("stringlist",) TYPE_CHECKER = copy(Option.TYPE_CHECKER) TYPE_CHECKER["stringlist"] = check_stringlist def main(): global NOT_HUBS, NOT_REACTORS, NOT_TESTS parser = OptionParser(option_class=MyOption) parser.add_option('-u', '--hubs', type='stringlist') parser.add_option('-r', '--reactors', type='stringlist') parser.add_option('--ignore-hubs', type='stringlist', default=[]) parser.add_option('--ignore-reactors', type='stringlist', default=[]) parser.add_option('--ignore-tests', type='stringlist', default=[]) parser.add_option('-s', '--show', help='show default values and exit', action='store_true', default=False) parser.add_option('-a', '--all', action='store_true', default=False) options, args = parser.parse_args() options.tests = args or None if options.all: NOT_HUBS = NOT_REACTORS = NOT_TESTS = set() if options.hubs is None: options.hubs = enum_hubs() if options.reactors is None: options.reactors = enum_reactors() if options.tests is None: options.tests = enum_tests() tests = [] for t in options.tests: tests.extend(glob(t)) options.tests = tests options.hubs = list(set(options.hubs) - set(options.ignore_hubs)) options.reactors = list(set(options.reactors) - set(options.ignore_reactors)) options.tests = list(set(options.tests) - set(options.ignore_tests)) random.shuffle(options.hubs) options.hubs.sort(key=first_hubs.__contains__, reverse=True) random.shuffle(options.reactors) options.reactors.sort(key=first_reactors.__contains__, reverse=True) random.shuffle(options.tests) print 'hubs: %s' % ','.join(options.hubs) print 'reactors: %s' % ','.join(options.reactors) print 'tests: %s' % ','.join(options.tests) if options.show: return setups = [] for hub in options.hubs: if hub == 'twistedr': for reactor in options.reactors: setups.append('--hub twistedr --reactor %s' % reactor) else: setups.append('--hub %s' % hub) last_time = time() for setup in setups: w(setup) for test in options.tests: w(test) cmd(COMMAND % locals()) if time()-last_time>PARSE_PERIOD: os.system('./parse_results.py') last_time = time() os.system('./parse_results.py') if __name__=='__main__': main() diff --git a/greentest/saranwrap_test.py b/greentest/saranwrap_test.py index 3380d7d..e6c5914 100755 --- a/greentest/saranwrap_test.py +++ b/greentest/saranwrap_test.py @@ -1,363 +1,363 @@ #!/usr/bin/python2 # @file test_saranwrap.py # @brief Test cases for saranwrap. # # Copyright (c) 2007, Linden Research, Inc. # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. -from eventlet import api, saranwrap -from eventlet.pool import Pool +from eventlib import api, saranwrap +from eventlib.pool import Pool import os import sys import tempfile import time import unittest import uuid # random test stuff def list_maker(): return [0,1,2] one = 1 two = 2 three = 3 class CoroutineCallingClass(object): def __init__(self): self._my_dict = {} def run_coroutine(self): api.spawn(self._add_random_key) def _add_random_key(self): self._my_dict['random'] = 'yes, random' def get_dict(self): return self._my_dict class TestSaranwrap(unittest.TestCase): def assert_server_exists(self, prox): self.assert_(saranwrap.status(prox)) prox.foo = 0 self.assertEqual(0, prox.foo) def test_wrap_tuple(self): my_tuple = (1, 2) prox = saranwrap.wrap(my_tuple) self.assertEqual(prox[0], 1) self.assertEqual(prox[1], 2) self.assertEqual(len(my_tuple), 2) def test_wrap_string(self): my_object = "whatever" prox = saranwrap.wrap(my_object) self.assertEqual(str(my_object), str(prox)) self.assertEqual(len(my_object), len(prox)) self.assertEqual(my_object.join(['a', 'b']), prox.join(['a', 'b'])) def test_wrap_uniterable(self): # here we're treating the exception as just a normal class prox = saranwrap.wrap(FloatingPointError()) def index(): prox[0] def key(): prox['a'] self.assertRaises(IndexError, index) self.assertRaises(TypeError, key) def test_wrap_dict(self): my_object = {'a':1} prox = saranwrap.wrap(my_object) self.assertEqual('a', prox.keys()[0]) self.assertEqual(1, prox['a']) self.assertEqual(str(my_object), str(prox)) self.assertEqual('saran:' + repr(my_object), repr(prox)) self.assertEqual('saran:' + `my_object`, `prox`) def test_wrap_module_class(self): prox = saranwrap.wrap(uuid) self.assertEqual(saranwrap.Proxy, type(prox)) id = prox.uuid4() self.assertEqual(id.get_version(), uuid.uuid4().get_version()) self.assert_(repr(prox.uuid4)) def test_wrap_eq(self): prox = saranwrap.wrap(uuid) id1 = prox.uuid4() id2 = prox.UUID(str(id1)) self.assertEqual(id1, id2) id3 = prox.uuid4() self.assert_(id1 != id3) def test_wrap_nonzero(self): prox = saranwrap.wrap(uuid) id1 = prox.uuid4() self.assert_(bool(id1)) prox2 = saranwrap.wrap([1, 2, 3]) self.assert_(bool(prox2)) def test_multiple_wraps(self): prox1 = saranwrap.wrap(uuid) prox2 = saranwrap.wrap(uuid) x1 = prox1.uuid4() x2 = prox1.uuid4() del x2 x3 = prox2.uuid4() def test_dict_passthru(self): prox = saranwrap.wrap(uuid) x = prox.uuid4() self.assertEqual(type(x.__dict__), saranwrap.ObjectProxy) # try it all on one line just for the sake of it self.assertEqual(type(saranwrap.wrap(uuid).uuid4().__dict__), saranwrap.ObjectProxy) def test_is_value(self): server = saranwrap.Server(None, None, None) self.assert_(server.is_value(None)) def test_wrap_getitem(self): prox = saranwrap.wrap([0,1,2]) self.assertEqual(prox[0], 0) def test_wrap_setitem(self): prox = saranwrap.wrap([0,1,2]) prox[1] = 2 self.assertEqual(prox[1], 2) def test_raising_exceptions(self): prox = saranwrap.wrap(uuid) def nofunc(): prox.never_name_a_function_like_this() self.assertRaises(AttributeError, nofunc) def test_raising_weird_exceptions(self): # the recursion is killing me! prox = saranwrap.wrap(saranwrap) try: prox.raise_a_weird_error() self.assert_(False) except: import sys ex = sys.exc_info()[0] self.assertEqual(ex, "oh noes you can raise a string") self.assert_server_exists(prox) def test_unpicklable_server_exception(self): prox = saranwrap.wrap(saranwrap) def unpickle(): prox.raise_an_unpicklable_error() self.assertRaises(saranwrap.UnrecoverableError, unpickle) # It's basically dead #self.assert_server_exists(prox) def test_pickleable_server_exception(self): prox = saranwrap.wrap(saranwrap) def fperror(): prox.raise_standard_error() self.assertRaises(FloatingPointError, fperror) self.assert_server_exists(prox) def test_print_does_not_break_wrapper(self): prox = saranwrap.wrap(saranwrap) prox.print_string('hello') self.assert_server_exists(prox) def test_stderr_does_not_break_wrapper(self): prox = saranwrap.wrap(saranwrap) prox.err_string('goodbye') self.assert_server_exists(prox) def assertLessThan(self, a, b): self.assert_(a < b, "%s is not less than %s" % (a, b)) def test_status(self): prox = saranwrap.wrap(time) a = prox.gmtime(0) status = saranwrap.status(prox) self.assertEqual(status['object_count'], 1) self.assertEqual(status['next_id'], 2) self.assert_(status['pid']) # can't guess what it will be # status of an object should be the same as the module self.assertEqual(saranwrap.status(a), status) # create a new one then immediately delete it prox.gmtime(1) is_id = prox.ctime(1) # sync up deletes status = saranwrap.status(prox) self.assertEqual(status['object_count'], 1) self.assertEqual(status['next_id'], 3) prox2 = saranwrap.wrap(uuid) self.assert_(status['pid'] != saranwrap.status(prox2)['pid']) def test_del(self): prox = saranwrap.wrap(time) delme = prox.gmtime(0) status_before = saranwrap.status(prox) #print status_before['objects'] del delme # need to do an access that doesn't create an object # in order to sync up the deleted objects prox.ctime(1) status_after = saranwrap.status(prox) #print status_after['objects'] self.assertLessThan(status_after['object_count'], status_before['object_count']) def test_contains(self): prox = saranwrap.wrap({'a':'b'}) self.assert_('a' in prox) self.assert_('x' not in prox) def test_variable_and_keyword_arguments_with_function_calls(self): import optparse prox = saranwrap.wrap(optparse) parser = prox.OptionParser() z = parser.add_option('-n', action='store', type='string', dest='n') opts,args = parser.parse_args(["-nfoo"]) self.assertEqual(opts.n, 'foo') def test_original_proxy_going_out_of_scope(self): def make_uuid(): prox = saranwrap.wrap(uuid) # after this function returns, prox should fall out of scope return prox.uuid4() tid = make_uuid() self.assertEqual(tid.get_version(), uuid.uuid4().get_version()) def make_list(): from greentest import saranwrap_test prox = saranwrap.wrap(saranwrap_test.list_maker) # after this function returns, prox should fall out of scope return prox() proxl = make_list() self.assertEqual(proxl[2], 2) def test_status_of_none(self): try: saranwrap.status(None) self.assert_(False) except AttributeError, e: pass def test_not_inheriting_pythonpath(self): # construct a fake module in the temp directory temp_dir = tempfile.mkdtemp("saranwrap_test") fp = open(os.path.join(temp_dir, "jitar_hero.py"), "w") fp.write("""import os, sys pypath = os.environ['PYTHONPATH'] sys_path = sys.path""") fp.close() # this should fail because we haven't stuck the temp_dir in our path yet prox = saranwrap.wrap_module('jitar_hero') import cPickle try: prox.pypath self.fail() except cPickle.UnpicklingError: pass # now try to saranwrap it sys.path.append(temp_dir) try: import jitar_hero prox = saranwrap.wrap(jitar_hero) self.assert_(prox.pypath.count(temp_dir)) self.assert_(prox.sys_path.count(temp_dir)) finally: import shutil shutil.rmtree(temp_dir) sys.path.remove(temp_dir) def test_contention(self): from greentest import saranwrap_test prox = saranwrap.wrap(saranwrap_test) pool = Pool(max_size=4) waiters = [] waiters.append(pool.execute(lambda: self.assertEquals(prox.one, 1))) waiters.append(pool.execute(lambda: self.assertEquals(prox.two, 2))) waiters.append(pool.execute(lambda: self.assertEquals(prox.three, 3))) for waiter in waiters: waiter.wait() def test_copy(self): import copy compound_object = {'a':[1,2,3]} prox = saranwrap.wrap(compound_object) def make_assertions(copied): self.assert_(isinstance(copied, dict)) self.assert_(isinstance(copied['a'], list)) self.assertEquals(copied, compound_object) self.assertNotEqual(id(compound_object), id(copied)) make_assertions(copy.copy(prox)) make_assertions(copy.deepcopy(prox)) def test_list_of_functions(self): return # this test is known to fail, we can implement it sometime in the future if we wish from greentest import saranwrap_test prox = saranwrap.wrap([saranwrap_test.list_maker]) self.assertEquals(list_maker(), prox[0]()) def test_under_the_hood_coroutines(self): # so, we want to write a class which uses a coroutine to call # a function. Then we want to saranwrap that class, have # the object call the coroutine and verify that it ran from greentest import saranwrap_test mod_proxy = saranwrap.wrap(saranwrap_test) obj_proxy = mod_proxy.CoroutineCallingClass() obj_proxy.run_coroutine() # sleep for a bit to make sure out coroutine ran by the time # we check the assert below api.sleep(0.1) self.assert_( 'random' in obj_proxy.get_dict(), 'Coroutine in saranwrapped object did not run') def test_child_process_death(self): prox = saranwrap.wrap({}) pid = saranwrap.getpid(prox) self.assertEqual(os.kill(pid, 0), None) # assert that the process is running del prox # removing all references to the proxy should kill the child process api.sleep(0.1) # need to let the signal handler run self.assertRaises(OSError, os.kill, pid, 0) # raises OSError if pid doesn't exist def test_detection_of_server_crash(self): # make the server crash here pass def test_equality_with_local_object(self): # we'll implement this if there's a use case for it pass def test_non_blocking(self): # here we test whether it's nonblocking pass if __name__ == '__main__': unittest.main() diff --git a/greentest/test__api.py b/greentest/test__api.py index ed08e62..9a6b2c6 100644 --- a/greentest/test__api.py +++ b/greentest/test__api.py @@ -1,60 +1,60 @@ # Copyright (c) 2008 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import unittest -from eventlet.api import sleep, spawn, kill, with_timeout, TimeoutError +from eventlib.api import sleep, spawn, kill, with_timeout, TimeoutError DELAY = 0.1 class Test(unittest.TestCase): def test_killing_dormant(self): state = [] def test(): try: state.append('start') sleep(DELAY) except: state.append('except') # catching GreenletExit pass # when switching to hub, hub makes itself the parent of this greenlet, # thus after the function's done, the control will go to the parent # QQQ why the first sleep is not enough? sleep(0) state.append('finished') g = spawn(test) sleep(DELAY/2) assert state == ['start'], state kill(g) # will not get there, unless switching is explicitly scheduled by kill assert state == ['start', 'except'], state sleep(DELAY) assert state == ['start', 'except', 'finished'], state def test_nested_with_timeout(self): def func(): return with_timeout(0.2, sleep, 2, timeout_value=1) self.assertRaises(TimeoutError, with_timeout, 0.1, func) if __name__=='__main__': unittest.main() diff --git a/greentest/test__api_timeout.py b/greentest/test__api_timeout.py index 1f59a44..c5bea8a 100644 --- a/greentest/test__api_timeout.py +++ b/greentest/test__api_timeout.py @@ -1,127 +1,127 @@ # Copyright (c) 2008 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. from __future__ import with_statement import sys import unittest import weakref import time -from eventlet.api import sleep, timeout, TimeoutError, _SilentException +from eventlib.api import sleep, timeout, TimeoutError, _SilentException DELAY = 0.01 class Error(Exception): pass class Test(unittest.TestCase): def test_api(self): # Nothing happens if with-block finishes before the timeout expires with timeout(DELAY*2): sleep(DELAY) sleep(DELAY*2) # check if timer was actually cancelled # An exception will be raised if it's not try: with timeout(DELAY): sleep(DELAY*2) except TimeoutError: pass else: raise AssertionError('must raise TimeoutError') # You can customize the exception raised: try: with timeout(DELAY, IOError("Operation takes way too long")): sleep(DELAY*2) except IOError, ex: assert str(ex)=="Operation takes way too long", repr(ex) # Providing classes instead of values should be possible too: try: with timeout(DELAY, ValueError): sleep(DELAY*2) except ValueError: pass # basically, anything that greenlet.throw accepts work: try: 1/0 except: try: with timeout(DELAY, *sys.exc_info()): sleep(DELAY*2) raise AssertionError('should not get there') raise AssertionError('should not get there') except ZeroDivisionError: pass else: raise AssertionError('should not get there') # It's possible to cancel the timer inside the block: with timeout(DELAY) as timer: timer.cancel() sleep(DELAY*2) # To silent the exception, pass None as second parameter. The with-block # will be interrupted with _SilentException, but it won't be propagated # outside. XDELAY=0.1 start = time.time() with timeout(XDELAY, None): sleep(XDELAY*2) delta = (time.time()-start) assert delta>> ', open(path).read(), re.M): s = doctest.DocTestSuite(m) print '%s (from %s): %s tests' % (m, path, len(s._tests)) suite.addTest(s) modules_count += 1 tests_count += len(s._tests) print 'Total: %s tests in %s modules' % (tests_count, modules_count) runner = unittest.TextTestRunner(verbosity=2) runner.run(suite) diff --git a/greentest/test__event.py b/greentest/test__event.py index 311ddbe..048eae8 100644 --- a/greentest/test__event.py +++ b/greentest/test__event.py @@ -1,63 +1,63 @@ # Copyright (c) 2008-2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import unittest -from eventlet.coros import event -from eventlet.api import spawn, sleep, exc_after, with_timeout +from eventlib.coros import event +from eventlib.api import spawn, sleep, exc_after, with_timeout from greentest import LimitedTestCase DELAY= 0.01 class TestEvent(LimitedTestCase): def test_send_exc(self): log = [] e = event() def waiter(): try: result = e.wait() log.append(('received', result)) except Exception, ex: log.append(('catched', ex)) spawn(waiter) sleep(0) # let waiter to block on e.wait() obj = Exception() e.send(exc=obj) sleep(0) assert log == [('catched', obj)], log def test_send(self): event1 = event() event2 = event() spawn(event1.send, 'hello event1') exc_after(0, ValueError('interrupted')) try: result = event1.wait() except ValueError: X = object() result = with_timeout(DELAY, event2.wait, timeout_value=X) assert result is X, 'Nobody sent anything to event2 yet it received %r' % (result, ) if __name__=='__main__': unittest.main() diff --git a/greentest/test__greenness.py b/greentest/test__greenness.py index 282902f..806832c 100644 --- a/greentest/test__greenness.py +++ b/greentest/test__greenness.py @@ -1,65 +1,65 @@ # Copyright (c) 2008 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. -"""Test than modules in eventlet.green package are indeed green. +"""Test than modules in eventlib.green package are indeed green. To do that spawn a green server and then access it using a green socket. If either operation blocked the whole script would block and timeout. """ import unittest from greentest import test_support -from eventlet.green import urllib2, BaseHTTPServer -from eventlet.api import spawn, kill +from eventlib.green import urllib2, BaseHTTPServer +from eventlib.api import spawn, kill port = 18341 def start_http_server(): server_address = ('', port) BaseHTTPServer.BaseHTTPRequestHandler.protocol_version = "HTTP/1.0" httpd = BaseHTTPServer.HTTPServer(server_address, BaseHTTPServer.BaseHTTPRequestHandler) sa = httpd.socket.getsockname() #print "Serving HTTP on", sa[0], "port", sa[1], "..." httpd.request_count = 0 def serve(): httpd.handle_request() httpd.request_count += 1 return spawn(serve), httpd class TestGreenness(unittest.TestCase): def setUp(self): self.gthread, self.server = start_http_server() #print 'Spawned the server' def tearDown(self): self.server.server_close() kill(self.gthread) def test_urllib2(self): self.assertEqual(self.server.request_count, 0) try: urllib2.urlopen('http://127.0.0.1:%s' % port) assert False, 'should not get there' except urllib2.HTTPError, ex: assert ex.code == 501, `ex` self.assertEqual(self.server.request_count, 1) if __name__ == '__main__': test_support.run_unittest(TestGreenness) diff --git a/greentest/test__hub.py b/greentest/test__hub.py index 7aedd8c..6dfc31f 100644 --- a/greentest/test__hub.py +++ b/greentest/test__hub.py @@ -1,82 +1,82 @@ # Copyright (c) 2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import unittest import time -from eventlet import api -from eventlet.green import socket +from eventlib import api +from eventlib.green import socket DELAY = 0.1 class TestScheduleCall(unittest.TestCase): def test_local(self): lst = [1] api.spawn(api.get_hub().schedule_call_local, DELAY, lst.pop) api.sleep(DELAY*2) assert lst == [1], lst def test_global(self): lst = [1] api.spawn(api.get_hub().schedule_call_global, DELAY, lst.pop) api.sleep(DELAY*2) assert lst == [], lst class TestCloseSocketWhilePolling(unittest.TestCase): def test(self): try: sock = socket.socket() api.call_after(0, sock.close) sock.connect(('python.org', 81)) except Exception: api.sleep(0) else: assert False, 'expected an error here' class TestExceptionInMainloop(unittest.TestCase): def test_sleep(self): # even if there was an error in the mainloop, the hub should continue to work start = time.time() api.sleep(DELAY) delay = time.time() - start assert delay >= DELAY*0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (delay, DELAY) def fail(): 1/0 api.get_hub().schedule_call_global(0, fail) start = time.time() api.sleep(DELAY) delay = time.time() - start assert delay >= DELAY*0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (delay, DELAY) if __name__=='__main__': unittest.main() diff --git a/greentest/test__pool.py b/greentest/test__pool.py index b64e7e1..385a804 100644 --- a/greentest/test__pool.py +++ b/greentest/test__pool.py @@ -1,199 +1,199 @@ -from eventlet import pool, coros, api +from eventlib import pool, coros, api from greentest import LimitedTestCase from unittest import main class TestCoroutinePool(LimitedTestCase): klass = pool.Pool def test_execute_async(self): done = coros.event() def some_work(): done.send() pool = self.klass(0, 2) pool.execute_async(some_work) done.wait() def test_execute(self): value = 'return value' def some_work(): return value pool = self.klass(0, 2) worker = pool.execute(some_work) self.assertEqual(value, worker.wait()) def test_multiple_coros(self): evt = coros.event() results = [] def producer(): results.append('prod') evt.send() def consumer(): results.append('cons1') evt.wait() results.append('cons2') pool = self.klass(0, 2) done = pool.execute(consumer) pool.execute_async(producer) done.wait() self.assertEquals(['cons1', 'prod', 'cons2'], results) def test_timer_cancel(self): # this test verifies that local timers are not fired # outside of the context of the execute method timer_fired = [] def fire_timer(): timer_fired.append(True) def some_work(): api.get_hub().schedule_call_local(0, fire_timer) pool = self.klass(0, 2) worker = pool.execute(some_work) worker.wait() api.sleep(0) self.assertEquals(timer_fired, []) def test_reentrant(self): pool = self.klass(0,1) def reenter(): waiter = pool.execute(lambda a: a, 'reenter') self.assertEqual('reenter', waiter.wait()) outer_waiter = pool.execute(reenter) outer_waiter.wait() evt = coros.event() def reenter_async(): pool.execute_async(lambda a: a, 'reenter') evt.send('done') pool.execute_async(reenter_async) evt.wait() def test_stderr_raising(self): # testing that really egregious errors in the error handling code # (that prints tracebacks to stderr) don't cause the pool to lose # any members import sys pool = self.klass(min_size=1, max_size=1) def crash(*args, **kw): raise RuntimeError("Whoa") class FakeFile(object): write = crash # we're going to do this by causing the traceback.print_exc in # safe_apply to raise an exception and thus exit _main_loop normal_err = sys.stderr try: sys.stderr = FakeFile() waiter = pool.execute(crash) self.assertRaises(RuntimeError, waiter.wait) # the pool should have something free at this point since the # waiter returned # pool.Pool change: if an exception is raised during execution of a link, # the rest of the links are scheduled to be executed on the next hub iteration # this introduces a delay in updating pool.sem which makes pool.free() report 0 # therefore, sleep: api.sleep(0) self.assertEqual(pool.free(), 1) # shouldn't block when trying to get t = api.exc_after(0.1, api.TimeoutError) try: pool.execute(api.sleep, 1) finally: t.cancel() finally: sys.stderr = normal_err def test_track_events(self): pool = self.klass(track_events=True) for x in range(6): pool.execute(lambda n: n, x) for y in range(6): pool.wait() def test_track_slow_event(self): pool = self.klass(track_events=True) def slow(): api.sleep(0.1) return 'ok' pool.execute(slow) self.assertEquals(pool.wait(), 'ok') def test_pool_smash(self): # The premise is that a coroutine in a Pool tries to get a token out # of a token pool but times out before getting the token. We verify # that neither pool is adversely affected by this situation. - from eventlet import pools + from eventlib import pools pool = self.klass(min_size=1, max_size=1) tp = pools.TokenPool(max_size=1) token = tp.get() # empty pool def do_receive(tp): api.exc_after(0, RuntimeError()) try: t = tp.get() self.fail("Shouldn't have recieved anything from the pool") except RuntimeError: return 'timed out' # the execute makes the token pool expect that coroutine, but then # immediately cuts bait e1 = pool.execute(do_receive, tp) self.assertEquals(e1.wait(), 'timed out') # the pool can get some random item back def send_wakeup(tp): tp.put('wakeup') api.spawn(send_wakeup, tp) # now we ask the pool to run something else, which should not # be affected by the previous send at all def resume(): return 'resumed' e2 = pool.execute(resume) self.assertEquals(e2.wait(), 'resumed') # we should be able to get out the thing we put in there, too self.assertEquals(tp.get(), 'wakeup') class PoolBasicTests(LimitedTestCase): klass = pool.Pool def test_execute_async(self): p = self.klass(max_size=2) self.assertEqual(p.free(), 2) r = [] def foo(a): r.append(a) evt = p.execute(foo, 1) self.assertEqual(p.free(), 1) evt.wait() self.assertEqual(r, [1]) api.sleep(0) self.assertEqual(p.free(), 2) #Once the pool is exhausted, calling an execute forces a yield. p.execute_async(foo, 2) self.assertEqual(1, p.free()) self.assertEqual(r, [1]) p.execute_async(foo, 3) self.assertEqual(0, p.free()) self.assertEqual(r, [1]) p.execute_async(foo, 4) self.assertEqual(r, [1,2,3]) api.sleep(0) self.assertEqual(r, [1,2,3,4]) def test_execute(self): p = self.klass() evt = p.execute(lambda a: ('foo', a), 1) self.assertEqual(evt.wait(), ('foo', 1)) if __name__=='__main__': main() diff --git a/greentest/test__proc.py b/greentest/test__proc.py index bc51bbb..560e5e3 100644 --- a/greentest/test__proc.py +++ b/greentest/test__proc.py @@ -1,395 +1,395 @@ # Copyright (c) 2008-2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import sys import unittest -from eventlet.api import sleep, with_timeout -from eventlet import api, proc, coros +from eventlib.api import sleep, with_timeout +from eventlib import api, proc, coros from greentest import LimitedTestCase DELAY = 0.01 class TestLink_Signal(LimitedTestCase): def test_send(self): s = proc.Source() q1, q2, q3 = coros.queue(), coros.queue(), coros.queue() s.link_value(q1) self.assertRaises(api.TimeoutError, s.wait, 0) assert s.wait(0, None) is None assert s.wait(0.001, None) is None self.assertRaises(api.TimeoutError, s.wait, 0.001) s.send(1) assert not q1.ready() assert s.wait()==1 api.sleep(0) assert q1.ready() s.link_exception(q2) s.link(q3) assert not q2.ready() api.sleep(0) assert q3.ready() assert s.wait()==1 def test_send_exception(self): s = proc.Source() q1, q2, q3 = coros.queue(), coros.queue(), coros.queue() s.link_exception(q1) s.send_exception(OSError('hello')) api.sleep(0) assert q1.ready() s.link_value(q2) s.link(q3) assert not q2.ready() api.sleep(0) assert q3.ready() self.assertRaises(OSError, q1.wait) self.assertRaises(OSError, q3.wait) self.assertRaises(OSError, s.wait) class TestProc(LimitedTestCase): def test_proc(self): p = proc.spawn(lambda : 100) receiver = proc.spawn(api.sleep, 1) p.link(receiver) self.assertRaises(proc.LinkedCompleted, receiver.wait) receiver2 = proc.spawn(api.sleep, 1) p.link(receiver2) self.assertRaises(proc.LinkedCompleted, receiver2.wait) def test_event(self): p = proc.spawn(lambda : 100) event = coros.event() p.link(event) self.assertEqual(event.wait(), 100) for i in xrange(3): event2 = coros.event() p.link(event2) self.assertEqual(event2.wait(), 100) def test_current(self): p = proc.spawn(lambda : 100) p.link() self.assertRaises(proc.LinkedCompleted, sleep, 0.1) class TestCase(LimitedTestCase): def link(self, p, listener=None): getattr(p, self.link_method)(listener) def tearDown(self): LimitedTestCase.tearDown(self) self.p.unlink() def set_links(self, p, first_time, kill_exc_type): event = coros.event() self.link(p, event) proc_flag = [] def receiver(): sleep(DELAY) proc_flag.append('finished') receiver = proc.spawn(receiver) self.link(p, receiver) queue = coros.queue(1) self.link(p, queue) try: self.link(p) except kill_exc_type: if first_time: raise else: assert first_time, 'not raising here only first time' callback_flag = ['initial'] self.link(p, lambda *args: callback_flag.remove('initial')) for _ in range(10): self.link(p, coros.event()) self.link(p, coros.queue(1)) return event, receiver, proc_flag, queue, callback_flag def set_links_timeout(self, link): # stuff that won't be touched event = coros.event() link(event) proc_finished_flag = [] def myproc(): sleep(10) proc_finished_flag.append('finished') return 555 myproc = proc.spawn(myproc) link(myproc) queue = coros.queue(0) link(queue) return event, myproc, proc_finished_flag, queue def check_timed_out(self, event, myproc, proc_finished_flag, queue): X = object() assert with_timeout(DELAY, event.wait, timeout_value=X) is X assert with_timeout(DELAY, queue.wait, timeout_value=X) is X assert with_timeout(DELAY, proc.waitall, [myproc], timeout_value=X) is X assert proc_finished_flag == [], proc_finished_flag class TestReturn_link(TestCase): link_method = 'link' def test_return(self): def return25(): return 25 p = self.p = proc.spawn(return25) self._test_return(p, True, 25, proc.LinkedCompleted, lambda : sleep(0)) # repeating the same with dead process for _ in xrange(3): self._test_return(p, False, 25, proc.LinkedCompleted, lambda : sleep(0)) def _test_return(self, p, first_time, result, kill_exc_type, action): event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type) # stuff that will time out because there's no unhandled exception: xxxxx = self.set_links_timeout(p.link_exception) try: sleep(DELAY*2) except kill_exc_type: assert first_time, 'raising here only first time' else: assert not first_time, 'Should not raise LinkedKilled here after first time' assert not p, p self.assertEqual(event.wait(), result) self.assertEqual(queue.wait(), result) self.assertRaises(kill_exc_type, receiver.wait) self.assertRaises(kill_exc_type, proc.waitall, [receiver]) sleep(DELAY) assert not proc_flag, proc_flag assert not callback_flag, callback_flag self.check_timed_out(*xxxxx) class TestReturn_link_value(TestReturn_link): sync = False link_method = 'link_value' class TestRaise_link(TestCase): link_method = 'link' def _test_raise(self, p, first_time, kill_exc_type): event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type) xxxxx = self.set_links_timeout(p.link_value) try: sleep(DELAY) except kill_exc_type: assert first_time, 'raising here only first time' else: assert not first_time, 'Should not raise LinkedKilled here after first time' assert not p, p self.assertRaises(ValueError, event.wait) self.assertRaises(ValueError, queue.wait) self.assertRaises(kill_exc_type, receiver.wait) self.assertRaises(kill_exc_type, proc.waitall, [receiver]) sleep(DELAY) assert not proc_flag, proc_flag assert not callback_flag, callback_flag self.check_timed_out(*xxxxx) def test_raise(self): p = self.p = proc.spawn(int, 'badint') self._test_raise(p, True, proc.LinkedFailed) # repeating the same with dead process for _ in xrange(3): self._test_raise(p, False, proc.LinkedFailed) def _test_kill(self, p, first_time, kill_exc_type): event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type) xxxxx = self.set_links_timeout(p.link_value) p.kill() try: sleep(DELAY) except kill_exc_type: assert first_time, 'raising here only first time' else: assert not first_time, 'Should not raise LinkedKilled here after first time' assert not p, p self.assertRaises(proc.ProcExit, event.wait) self.assertRaises(proc.ProcExit, queue.wait) self.assertRaises(kill_exc_type, proc.waitall, [receiver]) self.assertRaises(kill_exc_type, receiver.wait) sleep(DELAY) assert not proc_flag, proc_flag assert not callback_flag, callback_flag self.check_timed_out(*xxxxx) def test_kill(self): p = self.p = proc.spawn(sleep, DELAY) self._test_kill(p, True, proc.LinkedKilled) # repeating the same with dead process for _ in xrange(3): self._test_kill(p, False, proc.LinkedKilled) class TestRaise_link_exception(TestCase): link_method = 'link_exception' class TestStuff(unittest.TestCase): def test_wait_noerrors(self): x = proc.spawn(lambda : 1) y = proc.spawn(lambda : 2) z = proc.spawn(lambda : 3) self.assertEqual(proc.waitall([x, y, z]), [1, 2, 3]) e = coros.event() x.link(e) self.assertEqual(e.wait(), 1) x.unlink(e) e = coros.event() x.link(e) self.assertEqual(e.wait(), 1) self.assertEqual([proc.waitall([X]) for X in [x, y, z]], [[1], [2], [3]]) def test_wait_error(self): def x(): sleep(DELAY) return 1 x = proc.spawn(x) z = proc.spawn(lambda : 3) y = proc.spawn(int, 'badint') y.link(x) x.link(y) y.link(z) z.link(y) self.assertRaises(ValueError, proc.waitall, [x, y, z]) self.assertRaises(proc.LinkedFailed, proc.waitall, [x]) self.assertEqual(proc.waitall([z]), [3]) self.assertRaises(ValueError, proc.waitall, [y]) def test_wait_all_exception_order(self): # if there're several exceptions raised, the earliest one must be raised by wait def badint(): sleep(0.1) int('first') a = proc.spawn(badint) b = proc.spawn(int, 'second') try: proc.waitall([a, b]) except ValueError, ex: assert 'second' in str(ex), repr(str(ex)) def test_multiple_listeners_error(self): # if there was an error while calling a callback # it should not prevent the other listeners from being called # also, all of the errors should be logged, check the output # manually that they are p = proc.spawn(lambda : 5) results = [] def listener1(*args): results.append(10) 1/0 def listener2(*args): results.append(20) 2/0 def listener3(*args): 3/0 p.link(listener1) p.link(listener2) p.link(listener3) sleep(DELAY*10) assert results in [[10, 20], [20, 10]], results p = proc.spawn(int, 'hello') results = [] p.link(listener1) p.link(listener2) p.link(listener3) sleep(DELAY*10) assert results in [[10, 20], [20, 10]], results def _test_multiple_listeners_error_unlink(self, p): # notification must not happen after unlink even # though notification process has been already started results = [] def listener1(*args): p.unlink(listener2) results.append(5) 1/0 def listener2(*args): p.unlink(listener1) results.append(5) 2/0 def listener3(*args): 3/0 p.link(listener1) p.link(listener2) p.link(listener3) sleep(DELAY*10) assert results == [5], results def test_multiple_listeners_error_unlink_Proc(self): p = proc.spawn(lambda : 5) self._test_multiple_listeners_error_unlink(p) def test_multiple_listeners_error_unlink_Source(self): p = proc.Source() proc.spawn(p.send, 6) self._test_multiple_listeners_error_unlink(p) def test_killing_unlinked(self): e = coros.event() def func(): try: 1/0 except: e.send_exception(*sys.exc_info()) p = proc.spawn_link(func) try: try: e.wait() except ZeroDivisionError: pass finally: p.unlink() # this disables LinkedCompleted that otherwise would be raised by the next line sleep(DELAY) if __name__=='__main__': unittest.main() diff --git a/greentest/test__refcount.py b/greentest/test__refcount.py index b78f408..139469b 100644 --- a/greentest/test__refcount.py +++ b/greentest/test__refcount.py @@ -1,106 +1,106 @@ # Copyright (c) 2008 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. """This test checks that socket instances (not GreenSockets but underlying sockets) are not leaked by the hub. """ #import sys import unittest -from eventlet.green import socket -from eventlet.green.thread import start_new_thread -from eventlet.green.time import sleep +from eventlib.green import socket +from eventlib.green.thread import start_new_thread +from eventlib.green.time import sleep import weakref import gc address = ('0.0.0.0', 7878) SOCKET_TIMEOUT = 0.1 def init_server(): s = socket.socket() s.settimeout(SOCKET_TIMEOUT) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(address) s.listen(5) return s def handle_request(s, raise_on_timeout): try: conn, address = s.accept() except socket.timeout: if raise_on_timeout: raise else: return #print 'handle_request - accepted' res = conn.recv(100) assert res == 'hello', `res` #print 'handle_request - recvd %r' % res res = conn.send('bye') #print 'handle_request - sent %r' % res #print 'handle_request - conn refcount: %s' % sys.getrefcount(conn) #conn.close() def make_request(): #print 'make_request' s = socket.socket() s.connect(address) #print 'make_request - connected' res = s.send('hello') #print 'make_request - sent %s' % res res = s.recv(100) assert res == 'bye', `res` #print 'make_request - recvd %r' % res #s.close() def run_interaction(run_client): s = init_server() start_new_thread(handle_request, (s, run_client)) if run_client: start_new_thread(make_request, ()) sleep(0.1+SOCKET_TIMEOUT) #print sys.getrefcount(s.fd) #s.close() return weakref.ref(s.fd) def run_and_check(run_client): w = run_interaction(run_client=run_client) if w(): print gc.get_referrers(w()) for x in gc.get_referrers(w()): print x for y in gc.get_referrers(x): print '-', y raise AssertionError('server should be dead by now') class test(unittest.TestCase): def test_clean_exit(self): run_and_check(True) run_and_check(True) def test_timeout_exit(self): run_and_check(False) run_and_check(False) if __name__=='__main__': unittest.main() diff --git a/greentest/test__socket_errors.py b/greentest/test__socket_errors.py index ffaef09..a137d74 100644 --- a/greentest/test__socket_errors.py +++ b/greentest/test__socket_errors.py @@ -1,42 +1,42 @@ # Copyright (c) 2008-2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import unittest -from eventlet import api +from eventlib import api if hasattr(api._threadlocal, 'hub'): - from eventlet.green import socket + from eventlib.green import socket else: import socket class TestSocketErrors(unittest.TestCase): def test_connection_refused(self): s = socket.socket() try: s.connect(('127.0.0.1', 81)) except socket.error, ex: code, text = ex.args assert code in [111, 61], (code, text) assert 'refused' in text.lower(), (code, text) if __name__=='__main__': unittest.main() diff --git a/greentest/test__timers.py b/greentest/test__timers.py index 2a75790..a07d87b 100644 --- a/greentest/test__timers.py +++ b/greentest/test__timers.py @@ -1,63 +1,63 @@ # Copyright (c) 2008 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import unittest -from eventlet.api import call_after, spawn, sleep +from eventlib.api import call_after, spawn, sleep class test(unittest.TestCase): def setUp(self): self.lst = [1] def test_timer_fired(self): def func(): call_after(0.1, self.lst.pop) sleep(0.2) spawn(func) assert self.lst == [1], self.lst sleep(0.3) assert self.lst == [], self.lst def test_timer_cancelled_upon_greenlet_exit(self): def func(): call_after(0.1, self.lst.pop) spawn(func) assert self.lst == [1], self.lst sleep(0.2) assert self.lst == [1], self.lst def test_spawn_is_not_cancelled(self): def func(): spawn(self.lst.pop) # exiting immediatelly, but self.lst.pop must be called spawn(func) sleep(0.1) assert self.lst == [], self.lst if __name__=='__main__': unittest.main() diff --git a/greentest/test__twistedutil.py b/greentest/test__twistedutil.py index 7e5c8ad..28cf5cd 100644 --- a/greentest/test__twistedutil.py +++ b/greentest/test__twistedutil.py @@ -1,57 +1,57 @@ # Copyright (c) 2008 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. from twisted.internet import reactor from greentest import exit_unless_twisted exit_unless_twisted() import unittest from twisted.internet.error import DNSLookupError from twisted.internet import defer from twisted.python.failure import Failure -from eventlet.twistedutil import block_on +from eventlib.twistedutil import block_on class Test(unittest.TestCase): def test_block_on_success(self): from twisted.internet import reactor d = reactor.resolver.getHostByName('www.google.com') ip = block_on(d) assert len(ip.split('.'))==4, ip ip2 = block_on(d) assert ip == ip2, (ip, ip2) def test_block_on_fail(self): from twisted.internet import reactor d = reactor.resolver.getHostByName('xxx') self.assertRaises(DNSLookupError, block_on, d) def test_block_on_already_succeed(self): d = defer.succeed('hey corotwine') res = block_on(d) assert res == 'hey corotwine', `res` def test_block_on_already_failed(self): d = defer.fail(Failure(ZeroDivisionError())) self.assertRaises(ZeroDivisionError, block_on, d) if __name__=='__main__': unittest.main() diff --git a/greentest/test__twistedutil_protocol.py b/greentest/test__twistedutil_protocol.py index aa11148..6d84af5 100644 --- a/greentest/test__twistedutil_protocol.py +++ b/greentest/test__twistedutil_protocol.py @@ -1,235 +1,235 @@ # Copyright (c) 2008-2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. from twisted.internet import reactor from greentest import exit_unless_twisted exit_unless_twisted() import unittest from twisted.internet.error import ConnectionDone -import eventlet.twistedutil.protocol as pr -from eventlet.twistedutil.protocols.basic import LineOnlyReceiverTransport -from eventlet.api import spawn, sleep, with_timeout, call_after -from eventlet.coros import event +import eventlib.twistedutil.protocol as pr +from eventlib.twistedutil.protocols.basic import LineOnlyReceiverTransport +from eventlib.api import spawn, sleep, with_timeout, call_after +from eventlib.coros import event try: - from eventlet.green import socket + from eventlib.green import socket except SyntaxError: socket = None DELAY=0.01 if socket is not None: def setup_server_socket(self, delay=DELAY, port=0): s = socket.socket() s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(('127.0.0.1', port)) port = s.getsockname()[1] s.listen(5) s.settimeout(delay*3) def serve(): conn, addr = s.accept() conn.settimeout(delay+1) try: hello = conn.makefile().readline()[:-2] except socket.timeout: return conn.sendall('you said %s. ' % hello) sleep(delay) conn.sendall('BYE') sleep(delay) #conn.close() spawn(serve) return port def setup_server_SpawnFactory(self, delay=DELAY, port=0): def handle(conn): port.stopListening() try: hello = conn.readline() except ConnectionDone: return conn.write('you said %s. ' % hello) sleep(delay) conn.write('BYE') sleep(delay) conn.loseConnection() port = reactor.listenTCP(0, pr.SpawnFactory(handle, LineOnlyReceiverTransport)) return port.getHost().port class TestCase(unittest.TestCase): transportBufferSize = None @property def connector(self): return pr.GreenClientCreator(reactor, self.gtransportClass, self.transportBufferSize) def setUp(self): port = self.setup_server() self.conn = self.connector.connectTCP('127.0.0.1', port) if self.transportBufferSize is not None: self.assertEqual(self.transportBufferSize, self.conn.transport.bufferSize) class TestUnbufferedTransport(TestCase): gtransportClass = pr.UnbufferedTransport setup_server = setup_server_SpawnFactory def test_full_read(self): self.conn.write('hello\r\n') self.assertEqual(self.conn.read(), 'you said hello. BYE') self.assertEqual(self.conn.read(), '') self.assertEqual(self.conn.read(), '') def test_iterator(self): self.conn.write('iterator\r\n') self.assertEqual('you said iterator. BYE', ''.join(self.conn)) class TestUnbufferedTransport_bufsize1(TestUnbufferedTransport): transportBufferSize = 1 setup_server = setup_server_SpawnFactory class TestGreenTransport(TestUnbufferedTransport): gtransportClass = pr.GreenTransport setup_server = setup_server_SpawnFactory def test_read(self): self.conn.write('hello\r\n') self.assertEqual(self.conn.read(9), 'you said ') self.assertEqual(self.conn.read(999), 'hello. BYE') self.assertEqual(self.conn.read(9), '') self.assertEqual(self.conn.read(1), '') self.assertEqual(self.conn.recv(9), '') self.assertEqual(self.conn.recv(1), '') def test_read2(self): self.conn.write('world\r\n') self.assertEqual(self.conn.read(), 'you said world. BYE') self.assertEqual(self.conn.read(), '') self.assertEqual(self.conn.recv(), '') def test_iterator(self): self.conn.write('iterator\r\n') self.assertEqual('you said iterator. BYE', ''.join(self.conn)) _tests = [x for x in locals().keys() if x.startswith('test_')] def test_resume_producing(self): for test in self._tests: self.setUp() self.conn.resumeProducing() getattr(self, test)() def test_pause_producing(self): self.conn.pauseProducing() self.conn.write('hi\r\n') result = with_timeout(DELAY*10, self.conn.read, timeout_value='timed out') self.assertEqual('timed out', result) def test_pauseresume_producing(self): self.conn.pauseProducing() call_after(DELAY*5, self.conn.resumeProducing) self.conn.write('hi\r\n') result = with_timeout(DELAY*10, self.conn.read, timeout_value='timed out') self.assertEqual('you said hi. BYE', result) class TestGreenTransport_bufsize1(TestGreenTransport): transportBufferSize = 1 # class TestGreenTransportError(TestCase): # setup_server = setup_server_SpawnFactory # gtransportClass = pr.GreenTransport # # def test_read_error(self): # self.conn.write('hello\r\n') # sleep(DELAY*1.5) # make sure the rest of data arrives # try: # 1/0 # except: # #self.conn.loseConnection(failure.Failure()) # does not work, why? # spawn(self.conn._queue.send_exception, *sys.exc_info()) # self.assertEqual(self.conn.read(9), 'you said ') # self.assertEqual(self.conn.read(7), 'hello. ') # self.assertEqual(self.conn.read(9), 'BYE') # self.assertRaises(ZeroDivisionError, self.conn.read, 9) # self.assertEqual(self.conn.read(1), '') # self.assertEqual(self.conn.read(1), '') # # def test_recv_error(self): # self.conn.write('hello') # self.assertEqual('you said hello. ', self.conn.recv()) # sleep(DELAY*1.5) # make sure the rest of data arrives # try: # 1/0 # except: # #self.conn.loseConnection(failure.Failure()) # does not work, why? # spawn(self.conn._queue.send_exception, *sys.exc_info()) # self.assertEqual('BYE', self.conn.recv()) # self.assertRaises(ZeroDivisionError, self.conn.recv, 9) # self.assertEqual('', self.conn.recv(1)) # self.assertEqual('', self.conn.recv()) # if socket is not None: class TestUnbufferedTransport_socketserver(TestUnbufferedTransport): setup_server = setup_server_socket class TestUnbufferedTransport_socketserver_bufsize1(TestUnbufferedTransport): transportBufferSize = 1 setup_server = setup_server_socket class TestGreenTransport_socketserver(TestGreenTransport): setup_server = setup_server_socket class TestGreenTransport_socketserver_bufsize1(TestGreenTransport): transportBufferSize = 1 setup_server = setup_server_socket class TestTLSError(unittest.TestCase): def test_server_connectionMade_never_called(self): # trigger case when protocol instance is created, # but it's connectionMade is never called from gnutls.interfaces.twisted import TLSContext, X509Credentials from gnutls.errors import GNUTLSError cred = X509Credentials(None, None) ctx = TLSContext(cred) ev = event() def handle(conn): ev.send("handle must not be called") s = reactor.listenTLS(0, pr.SpawnFactory(handle, LineOnlyReceiverTransport), ctx) creator = pr.GreenClientCreator(reactor, LineOnlyReceiverTransport) try: conn = creator.connectTLS('127.0.0.1', s.getHost().port, ctx) except GNUTLSError: pass assert ev.poll() is None, repr(ev.poll()) try: import gnutls.interfaces.twisted except ImportError: del TestTLSError if __name__=='__main__': unittest.main() diff --git a/greentest/test_socket.py b/greentest/test_socket.py index 135a5c3..be0f234 100755 --- a/greentest/test_socket.py +++ b/greentest/test_socket.py @@ -1,1030 +1,1030 @@ #!/usr/bin/python2 import unittest from greentest import test_support -from eventlet.green import socket -from eventlet.green import select -from eventlet.green import time -from eventlet.green import thread, threading +from eventlib.green import socket +from eventlib.green import select +from eventlib.green import time +from eventlib.green import thread, threading import Queue import sys import array from weakref import proxy import signal PORT = 50007 HOST = 'localhost' MSG = 'Michael Gilfix was here\n' class SocketTCPTest(unittest.TestCase): def setUp(self): self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) global PORT PORT = test_support.bind_port(self.serv, HOST, PORT) self.serv.listen(1) def tearDown(self): self.serv.close() self.serv = None class SocketUDPTest(unittest.TestCase): def setUp(self): self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) global PORT PORT = test_support.bind_port(self.serv, HOST, PORT) def tearDown(self): self.serv.close() self.serv = None class ThreadableTest: """Threadable Test class The ThreadableTest class makes it easy to create a threaded client/server pair from an existing unit test. To create a new threaded class from an existing unit test, use multiple inheritance: class NewClass (OldClass, ThreadableTest): pass This class defines two new fixture functions with obvious purposes for overriding: clientSetUp () clientTearDown () Any new test functions within the class must then define tests in pairs, where the test name is preceeded with a '_' to indicate the client portion of the test. Ex: def testFoo(self): # Server portion def _testFoo(self): # Client portion Any exceptions raised by the clients during their tests are caught and transferred to the main thread to alert the testing framework. Note, the server setup function cannot call any blocking functions that rely on the client thread during setup, unless serverExplicityReady() is called just before the blocking call (such as in setting up a client/server connection and performing the accept() in setUp(). """ def __init__(self): # Swap the true setup function self.__setUp = self.setUp self.__tearDown = self.tearDown self.setUp = self._setUp self.tearDown = self._tearDown def serverExplicitReady(self): """This method allows the server to explicitly indicate that it wants the client thread to proceed. This is useful if the server is about to execute a blocking routine that is dependent upon the client thread during its setup routine.""" self.server_ready.set() def _setUp(self): self.server_ready = threading.Event() self.client_ready = threading.Event() self.done = threading.Event() self.queue = Queue.Queue(1) # Do some munging to start the client test. methodname = self.id() i = methodname.rfind('.') methodname = methodname[i+1:] test_method = getattr(self, '_' + methodname) self.client_thread = thread.start_new_thread( self.clientRun, (test_method,)) self.__setUp() if not self.server_ready.isSet(): self.server_ready.set() self.client_ready.wait() def _tearDown(self): self.__tearDown() self.done.wait() if not self.queue.empty(): msg = self.queue.get() self.fail(msg) def clientRun(self, test_func): self.server_ready.wait() self.client_ready.set() self.clientSetUp() if not callable(test_func): raise TypeError, "test_func must be a callable function" try: test_func() except Exception, strerror: self.queue.put(strerror) self.clientTearDown() def clientSetUp(self): raise NotImplementedError, "clientSetUp must be implemented." def clientTearDown(self): self.done.set() thread.exit() class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest): def __init__(self, methodName='runTest'): SocketTCPTest.__init__(self, methodName=methodName) ThreadableTest.__init__(self) def clientSetUp(self): self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def clientTearDown(self): self.cli.close() self.cli = None ThreadableTest.clientTearDown(self) class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest): def __init__(self, methodName='runTest'): SocketUDPTest.__init__(self, methodName=methodName) ThreadableTest.__init__(self) def clientSetUp(self): self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) class SocketConnectedTest(ThreadedTCPSocketTest): def __init__(self, methodName='runTest'): ThreadedTCPSocketTest.__init__(self, methodName=methodName) def setUp(self): ThreadedTCPSocketTest.setUp(self) # Indicate explicitly we're ready for the client thread to # proceed and then perform the blocking call to accept self.serverExplicitReady() conn, addr = self.serv.accept() self.cli_conn = conn def tearDown(self): self.cli_conn.close() self.cli_conn = None ThreadedTCPSocketTest.tearDown(self) def clientSetUp(self): ThreadedTCPSocketTest.clientSetUp(self) self.cli.connect((HOST, PORT)) self.serv_conn = self.cli def clientTearDown(self): self.serv_conn.close() self.serv_conn = None ThreadedTCPSocketTest.clientTearDown(self) class SocketPairTest(unittest.TestCase, ThreadableTest): def __init__(self, methodName='runTest'): unittest.TestCase.__init__(self, methodName=methodName) ThreadableTest.__init__(self) def setUp(self): self.serv, self.cli = socket.socketpair() def tearDown(self): self.serv.close() self.serv = None def clientSetUp(self): pass def clientTearDown(self): self.cli.close() self.cli = None ThreadableTest.clientTearDown(self) ####################################################################### ## Begin Tests class GeneralModuleTests(unittest.TestCase): def test_weakref(self): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) p = proxy(s) self.assertEqual(p.fileno(), s.fileno()) s.close() s = None try: p.fileno() except ReferenceError: pass else: self.fail('Socket proxy still exists') def testSocketError(self): # Testing socket module exceptions def raise_error(*args, **kwargs): raise socket.error def raise_herror(*args, **kwargs): raise socket.herror def raise_gaierror(*args, **kwargs): raise socket.gaierror self.failUnlessRaises(socket.error, raise_error, "Error raising socket exception.") self.failUnlessRaises(socket.error, raise_herror, "Error raising socket exception.") self.failUnlessRaises(socket.error, raise_gaierror, "Error raising socket exception.") def testCrucialConstants(self): # Testing for mission critical constants socket.AF_INET socket.SOCK_STREAM socket.SOCK_DGRAM socket.SOCK_RAW socket.SOCK_RDM socket.SOCK_SEQPACKET socket.SOL_SOCKET socket.SO_REUSEADDR def testHostnameRes(self): # Testing hostname resolution mechanisms hostname = socket.gethostname() try: ip = socket.gethostbyname(hostname) except socket.error: # Probably name lookup wasn't set up right; skip this test return self.assert_(ip.find('.') >= 0, "Error resolving host to ip.") try: hname, aliases, ipaddrs = socket.gethostbyaddr(ip) except socket.error: # Probably a similar problem as above; skip this test return all_host_names = [hostname, hname] + aliases fqhn = socket.getfqdn(ip) if not fqhn in all_host_names: self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names))) def testRefCountGetNameInfo(self): # Testing reference count for getnameinfo import sys if hasattr(sys, "getrefcount"): try: # On some versions, this loses a reference orig = sys.getrefcount(__name__) socket.getnameinfo(__name__,0) except SystemError: if sys.getrefcount(__name__) <> orig: self.fail("socket.getnameinfo loses a reference") def testInterpreterCrash(self): # Making sure getnameinfo doesn't crash the interpreter try: # On some versions, this crashes the interpreter. socket.getnameinfo(('x', 0, 0, 0), 0) except socket.error: pass def testNtoH(self): # This just checks that htons etc. are their own inverse, # when looking at the lower 16 or 32 bits. sizes = {socket.htonl: 32, socket.ntohl: 32, socket.htons: 16, socket.ntohs: 16} for func, size in sizes.items(): mask = (1L<> sys.stderr, """\ WARNING: an attempt to connect to %r %s, in test_timeout. That may be legitimate, but is not the outcome we hoped for. If this message is seen often, test_timeout should be changed to use a more reliable address.""" % (ADDR, extra_msg) if test_support.verbose: print "test_timeout ..." # A service which issues a welcome banner (without need to write # anything). ADDR = "pop.gmail.com", 995 s = socket.socket() s.settimeout(30.0) try: s.connect(ADDR) except socket.timeout: error_msg('timed out') return except socket.error, exc: # In case connection is refused. if exc.args[0] == errno.ECONNREFUSED: error_msg('was refused') return else: raise ss = socket.ssl(s) # Read part of return welcome banner twice. ss.read(1) ss.read(1) s.close() def test_rude_shutdown(): if test_support.verbose: print "test_rude_shutdown ..." - from eventlet.green import threading + from eventlib.green import threading # Some random port to connect to. PORT = [9934] listener_ready = threading.Event() listener_gone = threading.Event() # `listener` runs in a thread. It opens a socket listening on PORT, and # sits in an accept() until the main thread connects. Then it rudely # closes the socket, and sets Event `listener_gone` to let the main thread # know the socket is gone. def listener(): s = socket.socket() PORT[0] = test_support.bind_port(s, '', PORT[0]) s.listen(5) listener_ready.set() s.accept() s = None # reclaim the socket object, which also closes it listener_gone.set() def connector(): listener_ready.wait() s = socket.socket() s.connect(('localhost', PORT[0])) listener_gone.wait() try: ssl_sock = socket.ssl(s) except socket.sslerror: pass else: raise test_support.TestFailed( 'connecting to closed SSL socket should have failed') t = threading.Thread(target=listener) t.start() connector() t.join() def test_rude_shutdown__write(): if test_support.verbose: print "test_rude_shutdown__variant ..." - from eventlet.green import threading + from eventlib.green import threading # Some random port to connect to. PORT = [9934] listener_ready = threading.Event() listener_gone = threading.Event() # `listener` runs in a thread. It opens a socket listening on PORT, and # sits in an accept() until the main thread connects. Then it rudely # closes the socket, and sets Event `listener_gone` to let the main thread # know the socket is gone. def listener(): s = socket.socket() PORT[0] = test_support.bind_port(s, '', PORT[0]) s.listen(5) listener_ready.set() s.accept() s = None # reclaim the socket object, which also closes it listener_gone.set() def connector(): listener_ready.wait() s = socket.socket() s.connect(('localhost', PORT[0])) listener_gone.wait() try: ssl_sock = socket.ssl(s) ssl_sock.write("hello") except socket.sslerror: pass else: raise test_support.TestFailed( 'connecting to closed SSL socket should have failed') t = threading.Thread(target=listener) t.start() connector() t.join() class Test(unittest.TestCase): test_basic = lambda self: test_basic() test_timeout = lambda self: test_timeout() test_rude_shutdown = lambda self: test_rude_shutdown() test_rude_shutdown__write = lambda self: test_rude_shutdown__write() def test_main(): if not hasattr(socket, "ssl"): raise test_support.TestSkipped("socket module has no ssl support") test_support.run_unittest(Test) if __name__ == "__main__": test_main() diff --git a/greentest/test_socketserver.py b/greentest/test_socketserver.py index 0361f89..bde3409 100644 --- a/greentest/test_socketserver.py +++ b/greentest/test_socketserver.py @@ -1,245 +1,245 @@ # Test suite for SocketServer.py # converted to unittest (Denis) from greentest import test_support from greentest.test_support import (verbose, verify, TESTFN, TestSkipped, reap_children) test_support.requires('network') -from eventlet.green.SocketServer import * -from eventlet.green import socket +from eventlib.green.SocketServer import * +from eventlib.green import socket import errno -from eventlet.green import select -from eventlet.green import time -from eventlet.green import threading +from eventlib.green import select +from eventlib.green import time +from eventlib.green import threading import sys import os import unittest NREQ = 3 DELAY = 0.05 class MyMixinHandler: def handle(self): time.sleep(DELAY) line = self.rfile.readline() time.sleep(DELAY) self.wfile.write(line) class MyStreamHandler(MyMixinHandler, StreamRequestHandler): pass class MyDatagramHandler(MyMixinHandler, DatagramRequestHandler): pass class MyMixinServer: def serve_a_few(self): for i in range(NREQ): self.handle_request() def handle_error(self, request, client_address): self.close_request(request) self.server_close() raise teststring = "hello world\n" def receive(sock, n, timeout=5): r, w, x = select.select([sock], [], [], timeout) if sock in r: return sock.recv(n) else: raise RuntimeError, "timed out on %r" % (sock,) def testdgram(proto, addr): s = socket.socket(proto, socket.SOCK_DGRAM) s.sendto(teststring, addr) buf = data = receive(s, 100) while data and '\n' not in buf: data = receive(s, 100) buf += data verify(buf == teststring) s.close() def teststream(proto, addr): s = socket.socket(proto, socket.SOCK_STREAM) s.connect(addr) s.sendall(teststring) buf = data = receive(s, 100) while data and '\n' not in buf: data = receive(s, 100) buf += data verify(buf == teststring) s.close() class ServerThread(threading.Thread): def __init__(self, addr, svrcls, hdlrcls): threading.Thread.__init__(self) self.__addr = addr self.__svrcls = svrcls self.__hdlrcls = hdlrcls def run(self): class svrcls(MyMixinServer, self.__svrcls): pass if verbose: print "thread: creating server" svr = svrcls(self.__addr, self.__hdlrcls) # pull the address out of the server in case it changed # this can happen if another process is using the port addr = svr.server_address if addr: self.__addr = addr if sys.version_info[:2] >= (2, 5): if self.__addr != svr.socket.getsockname(): raise RuntimeError('server_address was %s, expected %s' % (self.__addr, svr.socket.getsockname())) if verbose: print "thread: serving three times" svr.serve_a_few() if verbose: print "thread: done" seed = 0 def pickport(): global seed seed += 1 return 10000 + (os.getpid() % 1000)*10 + seed host = "localhost" testfiles = [] def pickaddr(proto): if proto == socket.AF_INET: return (host, pickport()) else: fn = TESTFN + str(pickport()) if os.name == 'os2': # AF_UNIX socket names on OS/2 require a specific prefix # which can't include a drive letter and must also use # backslashes as directory separators if fn[1] == ':': fn = fn[2:] if fn[0] in (os.sep, os.altsep): fn = fn[1:] fn = os.path.join('\socket', fn) if os.sep == '/': fn = fn.replace(os.sep, os.altsep) else: fn = fn.replace(os.altsep, os.sep) testfiles.append(fn) return fn def cleanup(): for fn in testfiles: try: os.remove(fn) except os.error: pass testfiles[:] = [] def testloop(proto, servers, hdlrcls, testfunc): for svrcls in servers: addr = pickaddr(proto) if verbose: print "ADDR =", addr print "CLASS =", svrcls t = ServerThread(addr, svrcls, hdlrcls) if verbose: print "server created" t.start() if verbose: print "server running" for i in range(NREQ): time.sleep(DELAY) if verbose: print "test client", i testfunc(proto, addr) if verbose: print "waiting for server" t.join() if verbose: print "done" class ForgivingTCPServer(TCPServer): # prevent errors if another process is using the port we want def server_bind(self): host, default_port = self.server_address # this code shamelessly stolen from test.test_support # the ports were changed to protect the innocent import sys for port in [default_port, 3434, 8798, 23833]: try: self.server_address = host, port TCPServer.server_bind(self) break except socket.error, (err, msg): if err != errno.EADDRINUSE: raise print >>sys.__stderr__, \ ' WARNING: failed to listen on port %d, trying another' % port tcpservers = [ForgivingTCPServer, ThreadingTCPServer] if hasattr(os, 'fork') and os.name not in ('os2',): tcpservers.append(ForkingTCPServer) udpservers = [UDPServer, ThreadingUDPServer] if hasattr(os, 'fork') and os.name not in ('os2',): udpservers.append(ForkingUDPServer) if not hasattr(socket, 'AF_UNIX'): streamservers = [] dgramservers = [] else: class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): pass streamservers = [UnixStreamServer, ThreadingUnixStreamServer] if hasattr(os, 'fork') and os.name not in ('os2',): streamservers.append(ForkingUnixStreamServer) class ForkingUnixDatagramServer(ForkingMixIn, UnixDatagramServer): pass dgramservers = [UnixDatagramServer, ThreadingUnixDatagramServer] if hasattr(os, 'fork') and os.name not in ('os2',): dgramservers.append(ForkingUnixDatagramServer) def sloppy_cleanup(): # See http://python.org/sf/1540386 # We need to reap children here otherwise a child from one server # can be left running for the next server and cause a test failure. time.sleep(DELAY) reap_children() def testall(): testloop(socket.AF_INET, tcpservers, MyStreamHandler, teststream) sloppy_cleanup() testloop(socket.AF_INET, udpservers, MyDatagramHandler, testdgram) if hasattr(socket, 'AF_UNIX'): sloppy_cleanup() testloop(socket.AF_UNIX, streamservers, MyStreamHandler, teststream) # Alas, on Linux (at least) recvfrom() doesn't return a meaningful # client address so this cannot work: ##testloop(socket.AF_UNIX, dgramservers, MyDatagramHandler, testdgram) class Test(unittest.TestCase): def tearDown(self): sloppy_cleanup() for tcpserver in tcpservers: n = tcpserver.__name__ exec """def test_%s(self): testloop(socket.AF_INET, [%s], MyStreamHandler, teststream)""" % (n,n) for udpserver in udpservers: n = udpserver.__name__ exec """def test_%s(self): testloop(socket.AF_INET, [%s], MyDatagramHandler, testdgram)""" % (n,n) if hasattr(socket, 'AF_UNIX'): for streamserver in streamservers: n = streamserver.__name__ exec """def test_%s(self): testloop(socket.AF_UNIX, [%s], MyStreamHandler, teststream)""" % (n,n) def testall(): test_support.run_unittest(Test) def test_main(): import imp if imp.lock_held(): # If the import lock is held, the threads will hang. raise TestSkipped("can't run when import lock is held") try: testall() finally: cleanup() reap_children() if __name__ == "__main__": test_main() diff --git a/greentest/test_support.py b/greentest/test_support.py index 0b4494c..58c8404 100644 --- a/greentest/test_support.py +++ b/greentest/test_support.py @@ -1,530 +1,530 @@ """Supporting definitions for the Python regression tests.""" if __name__ != 'greentest.test_support': raise ImportError, 'test_support must be imported from the test package' import sys class Error(Exception): """Base class for regression test exceptions.""" class TestFailed(Error): """Test failed.""" class TestSkipped(Error): """Test skipped. This can be raised to indicate that a test was deliberatly skipped, but not because a feature wasn't available. For example, if some resource can't be used, such as the network appears to be unavailable, this should be raised instead of TestFailed. """ class ResourceDenied(TestSkipped): """Test skipped because it requested a disallowed resource. This is raised when a test calls requires() for a resource that has not be enabled. It is used to distinguish between expected and unexpected skips. """ verbose = 1 # Flag set to 0 by regrtest.py use_resources = None # Flag set to [] by regrtest.py max_memuse = 0 # Disable bigmem tests (they will still be run with # small sizes, to make sure they work.) # _original_stdout is meant to hold stdout at the time regrtest began. # This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. # The point is to have some flavor of stdout the user can actually see. _original_stdout = None def record_original_stdout(stdout): global _original_stdout _original_stdout = stdout def get_original_stdout(): return _original_stdout or sys.stdout def unload(name): try: del sys.modules[name] except KeyError: pass def unlink(filename): import os try: os.unlink(filename) except OSError: pass def forget(modname): '''"Forget" a module was ever imported by removing it from sys.modules and deleting any .pyc and .pyo files.''' unload(modname) import os for dirname in sys.path: unlink(os.path.join(dirname, modname + os.extsep + 'pyc')) # Deleting the .pyo file cannot be within the 'try' for the .pyc since # the chance exists that there is no .pyc (and thus the 'try' statement # is exited) but there is a .pyo file. unlink(os.path.join(dirname, modname + os.extsep + 'pyo')) def is_resource_enabled(resource): """Test whether a resource is enabled. Known resources are set by regrtest.py.""" return use_resources is not None and resource in use_resources def requires(resource, msg=None): """Raise ResourceDenied if the specified resource is not available. If the caller's module is __main__ then automatically return True. The possibility of False being returned occurs when regrtest.py is executing.""" # see if the caller's module is __main__ - if so, treat as if # the resource was set return if sys._getframe().f_back.f_globals.get("__name__") == "__main__": return if not is_resource_enabled(resource): if msg is None: msg = "Use of the `%s' resource not enabled" % resource raise ResourceDenied(msg) def bind_port(sock, host='', preferred_port=54321): """Try to bind the sock to a port. If we are running multiple tests and we don't try multiple ports, the test can fails. This makes the test more robust.""" import socket, errno # Find some random ports that hopefully no one is listening on. # Ideally each test would clean up after itself and not continue listening # on any ports. However, this isn't the case. The last port (0) is # a stop-gap that asks the O/S to assign a port. Whenever the warning # message below is printed, the test that is listening on the port should # be fixed to close the socket at the end of the test. # Another reason why we can't use a port is another process (possibly # another instance of the test suite) is using the same port. for port in [preferred_port, 9907, 10243, 32999, 0]: try: sock.bind((host, port)) if port == 0: port = sock.getsockname()[1] return port except socket.error, (err, msg): if err != errno.EADDRINUSE: raise print >>sys.__stderr__, \ ' WARNING: failed to listen on port %d, trying another' % port raise TestFailed, 'unable to find port to listen on' FUZZ = 1e-6 def fcmp(x, y): # fuzzy comparison function if type(x) == type(0.0) or type(y) == type(0.0): try: x, y = coerce(x, y) fuzz = (abs(x) + abs(y)) * FUZZ if abs(x-y) <= fuzz: return 0 except: pass elif type(x) == type(y) and type(x) in (type(()), type([])): for i in range(min(len(x), len(y))): outcome = fcmp(x[i], y[i]) if outcome != 0: return outcome return cmp(len(x), len(y)) return cmp(x, y) try: unicode have_unicode = 1 except NameError: have_unicode = 0 is_jython = sys.platform.startswith('java') import os # Filename used for testing if os.name == 'java': # Jython disallows @ in module names TESTFN = '$test' elif os.name == 'riscos': TESTFN = 'testfile' else: TESTFN = '@test' # Unicode name only used if TEST_FN_ENCODING exists for the platform. if have_unicode: # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding() # TESTFN_UNICODE is a filename that can be encoded using the # file system encoding, but *not* with the default (ascii) encoding if isinstance('', unicode): # python -U # XXX perhaps unicode() should accept Unicode strings? TESTFN_UNICODE = "@test-\xe0\xf2" else: # 2 latin characters. TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1") TESTFN_ENCODING = sys.getfilesystemencoding() # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be # able to be encoded by *either* the default or filesystem encoding. # This test really only makes sense on Windows NT platforms # which have special Unicode support in posixmodule. if (not hasattr(sys, "getwindowsversion") or sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME TESTFN_UNICODE_UNENCODEABLE = None else: # Japanese characters (I think - from bug 846133) TESTFN_UNICODE_UNENCODEABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"') try: # XXX - Note - should be using TESTFN_ENCODING here - but for # Windows, "mbcs" currently always operates as if in # errors=ignore' mode - hence we get '?' characters rather than # the exception. 'Latin1' operates as we expect - ie, fails. # See [ 850997 ] mbcs encoding ignores errors TESTFN_UNICODE_UNENCODEABLE.encode("Latin1") except UnicodeEncodeError: pass else: print \ 'WARNING: The filename %r CAN be encoded by the filesystem. ' \ 'Unicode filename tests may not be effective' \ % TESTFN_UNICODE_UNENCODEABLE # Make sure we can write to TESTFN, try in /tmp if we can't fp = None try: fp = open(TESTFN, 'w+') except IOError: TMP_TESTFN = os.path.join('/tmp', TESTFN) try: fp = open(TMP_TESTFN, 'w+') TESTFN = TMP_TESTFN del TMP_TESTFN except IOError: print ('WARNING: tests will fail, unable to write to: %s or %s' % (TESTFN, TMP_TESTFN)) if fp is not None: fp.close() unlink(TESTFN) del os, fp def findfile(file, here=__file__): """Try to find a file on sys.path and the working directory. If it is not found the argument passed to the function is returned (this does not necessarily signal failure; could still be the legitimate path).""" import os if os.path.isabs(file): return file path = sys.path path = [os.path.dirname(here)] + path for dn in path: fn = os.path.join(dn, file) if os.path.exists(fn): return fn return file def verify(condition, reason='test failed'): """Verify that condition is true. If not, raise TestFailed. The optional argument reason can be given to provide a better error text. """ if not condition: raise TestFailed(reason) def vereq(a, b): """Raise TestFailed if a == b is false. This is better than verify(a == b) because, in case of failure, the error message incorporates repr(a) and repr(b) so you can see the inputs. Note that "not (a == b)" isn't necessarily the same as "a != b"; the former is tested. """ if not (a == b): raise TestFailed, "%r == %r" % (a, b) def sortdict(dict): "Like repr(dict), but in sorted order." items = dict.items() items.sort() reprpairs = ["%r: %r" % pair for pair in items] withcommas = ", ".join(reprpairs) return "{%s}" % withcommas def check_syntax(statement): try: compile(statement, '', 'exec') except SyntaxError: pass else: print 'Missing SyntaxError: "%s"' % statement def open_urlresource(url): import urllib, urlparse import os.path filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL! for path in [os.path.curdir, os.path.pardir]: fn = os.path.join(path, filename) if os.path.exists(fn): return open(fn) requires('urlfetch') print >> get_original_stdout(), '\tfetching %s ...' % url fn, _ = urllib.urlretrieve(url, filename) return open(fn) #======================================================================= # Decorator for running a function in a different locale, correctly resetting # it afterwards. def run_with_locale(catstr, *locales): def decorator(func): def inner(*args, **kwds): try: import locale category = getattr(locale, catstr) orig_locale = locale.setlocale(category) except AttributeError: # if the test author gives us an invalid category string raise except: # cannot retrieve original locale, so do nothing locale = orig_locale = None else: for loc in locales: try: locale.setlocale(category, loc) break except: pass # now run the function, resetting the locale on exceptions try: return func(*args, **kwds) finally: if locale and orig_locale: locale.setlocale(category, orig_locale) inner.func_name = func.func_name inner.__doc__ = func.__doc__ return inner return decorator #======================================================================= # Big-memory-test support. Separate from 'resources' because memory use should be configurable. # Some handy shorthands. Note that these are used for byte-limits as well # as size-limits, in the various bigmem tests _1M = 1024*1024 _1G = 1024 * _1M _2G = 2 * _1G # Hack to get at the maximum value an internal index can take. class _Dummy: def __getslice__(self, i, j): return j MAX_Py_ssize_t = _Dummy()[:] def set_memlimit(limit): import re global max_memuse sizes = { 'k': 1024, 'm': _1M, 'g': _1G, 't': 1024*_1G, } m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, re.IGNORECASE | re.VERBOSE) if m is None: raise ValueError('Invalid memory limit %r' % (limit,)) memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) if memlimit > MAX_Py_ssize_t: memlimit = MAX_Py_ssize_t if memlimit < _2G - 1: raise ValueError('Memory limit %r too low to be useful' % (limit,)) max_memuse = memlimit def bigmemtest(minsize, memuse, overhead=5*_1M): """Decorator for bigmem tests. 'minsize' is the minimum useful size for the test (in arbitrary, test-interpreted units.) 'memuse' is the number of 'bytes per size' for the test, or a good estimate of it. 'overhead' specifies fixed overhead, independant of the testsize, and defaults to 5Mb. The decorator tries to guess a good value for 'size' and passes it to the decorated test function. If minsize * memuse is more than the allowed memory use (as defined by max_memuse), the test is skipped. Otherwise, minsize is adjusted upward to use up to max_memuse. """ def decorator(f): def wrapper(self): if not max_memuse: # If max_memuse is 0 (the default), # we still want to run the tests with size set to a few kb, # to make sure they work. We still want to avoid using # too much memory, though, but we do that noisily. maxsize = 5147 self.failIf(maxsize * memuse + overhead > 20 * _1M) else: maxsize = int((max_memuse - overhead) / memuse) if maxsize < minsize: # Really ought to print 'test skipped' or something if verbose: sys.stderr.write("Skipping %s because of memory " "constraint\n" % (f.__name__,)) return # Try to keep some breathing room in memory use maxsize = max(maxsize - 50 * _1M, minsize) return f(self, maxsize) wrapper.minsize = minsize wrapper.memuse = memuse wrapper.overhead = overhead return wrapper return decorator def bigaddrspacetest(f): """Decorator for tests that fill the address space.""" def wrapper(self): if max_memuse < MAX_Py_ssize_t: if verbose: sys.stderr.write("Skipping %s because of memory " "constraint\n" % (f.__name__,)) else: return f(self) return wrapper #======================================================================= # Preliminary PyUNIT integration. import unittest class BasicTestRunner: def run(self, test): result = unittest.TestResult() test(result) return result def run_suite(suite, testclass=None): """Run tests from a unittest.TestSuite-derived class.""" if verbose: runner = unittest.TextTestRunner(sys.stdout, verbosity=2) else: runner = BasicTestRunner() result = runner.run(suite) if not result.wasSuccessful(): if len(result.errors) == 1 and not result.failures: err = result.errors[0][1] elif len(result.failures) == 1 and not result.errors: err = result.failures[0][1] else: if testclass is None: msg = "errors occurred; run in verbose mode for details" else: msg = "errors occurred in %s.%s" \ % (testclass.__module__, testclass.__name__) raise TestFailed(msg) raise TestFailed(err) def run_unittest(*classes): """Run tests from unittest.TestCase-derived classes.""" suite = unittest.TestSuite() for cls in classes: if isinstance(cls, (unittest.TestSuite, unittest.TestCase)): suite.addTest(cls) else: suite.addTest(unittest.makeSuite(cls)) if len(classes)==1: testclass = classes[0] else: testclass = None run_suite(suite, testclass) #======================================================================= # doctest driver. def run_doctest(module, verbosity=None): """Run doctest on the given module. Return (#failures, #tests). If optional argument verbosity is not specified (or is None), pass test_support's belief about verbosity on to doctest. Else doctest's usual behavior is used (it searches sys.argv for -v). """ import doctest if verbosity is None: verbosity = verbose else: verbosity = None # Direct doctest output (normally just errors) to real stdout; doctest # output shouldn't be compared by regrtest. save_stdout = sys.stdout sys.stdout = get_original_stdout() try: f, t = doctest.testmod(module, verbose=verbosity) if f: raise TestFailed("%d of %d doctests failed" % (f, t)) finally: sys.stdout = save_stdout if verbose: print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t) return f, t #======================================================================= # Threading support to prevent reporting refleaks when running regrtest.py -R def threading_setup(): - from eventlet.green import threading + from eventlib.green import threading return len(threading._active), len(threading._limbo) def threading_cleanup(num_active, num_limbo): - from eventlet.green import threading - from eventlet.green import time + from eventlib.green import threading + from eventlib.green import time _MAX_COUNT = 10 count = 0 while len(threading._active) != num_active and count < _MAX_COUNT: print threading._active count += 1 time.sleep(0.1) count = 0 while len(threading._limbo) != num_limbo and count < _MAX_COUNT: print threading._limbo count += 1 time.sleep(0.1) def reap_children(): """Use this function at the end of test_main() whenever sub-processes are started. This will help ensure that no extra children (zombies) stick around to hog resources and create problems when looking for refleaks. """ # Reap all our dead child processes so we don't leave zombies around. # These hog resources and might be causing some of the buildbots to die. import os if hasattr(os, 'waitpid'): any_process = -1 while True: try: # This will raise an exception on Windows. That's ok. pid, status = os.waitpid(any_process, os.WNOHANG) if pid == 0: break except: break diff --git a/greentest/test_thread.py b/greentest/test_thread.py index b4d9bb1..9db6f7c 100644 --- a/greentest/test_thread.py +++ b/greentest/test_thread.py @@ -1,161 +1,161 @@ # Very rudimentary test of thread module # Create a bunch of threads, let each do some work, wait until all are done from greentest.test_support import verbose import random -from eventlet.green import thread -from eventlet.green import time +from eventlib.green import thread +from eventlib.green import time mutex = thread.allocate_lock() rmutex = thread.allocate_lock() # for calls to random running = 0 done = thread.allocate_lock() done.acquire() numtasks = 10 def task(ident): global running rmutex.acquire() delay = random.random() * numtasks * 0.02 rmutex.release() if verbose: print 'task', ident, 'will run for', round(delay, 2), 'sec' time.sleep(delay) if verbose: print 'task', ident, 'done' mutex.acquire() running = running - 1 if running == 0: done.release() mutex.release() next_ident = 0 def newtask(): global next_ident, running mutex.acquire() next_ident = next_ident + 1 if verbose: print 'creating task', next_ident thread.start_new_thread(task, (next_ident,)) running = running + 1 mutex.release() for i in range(numtasks): newtask() print 'waiting for all tasks to complete' done.acquire() print 'all tasks done' class barrier: def __init__(self, n): self.n = n self.waiting = 0 self.checkin = thread.allocate_lock() self.checkout = thread.allocate_lock() self.checkout.acquire() def enter(self): checkin, checkout = self.checkin, self.checkout checkin.acquire() self.waiting = self.waiting + 1 if self.waiting == self.n: self.waiting = self.n - 1 checkout.release() return checkin.release() checkout.acquire() self.waiting = self.waiting - 1 if self.waiting == 0: checkin.release() return checkout.release() numtrips = 3 def task2(ident): global running for i in range(numtrips): if ident == 0: # give it a good chance to enter the next # barrier before the others are all out # of the current one delay = 0.001 else: rmutex.acquire() delay = random.random() * numtasks * 0.02 rmutex.release() if verbose: print 'task', ident, 'will run for', round(delay, 2), 'sec' time.sleep(delay) if verbose: print 'task', ident, 'entering barrier', i bar.enter() if verbose: print 'task', ident, 'leaving barrier', i mutex.acquire() running -= 1 # Must release mutex before releasing done, else the main thread can # exit and set mutex to None as part of global teardown; then # mutex.release() raises AttributeError. finished = running == 0 mutex.release() if finished: done.release() print '\n*** Barrier Test ***' if done.acquire(0): raise ValueError, "'done' should have remained acquired" bar = barrier(numtasks) running = numtasks for i in range(numtasks): thread.start_new_thread(task2, (i,)) done.acquire() print 'all tasks done' if hasattr(thread, 'stack_size'): # not all platforms support changing thread stack size print '\n*** Changing thread stack size ***' if thread.stack_size() != 0: raise ValueError, "initial stack_size not 0" thread.stack_size(0) if thread.stack_size() != 0: raise ValueError, "stack_size not reset to default" from os import name as os_name if os_name in ("nt", "os2", "posix"): tss_supported = 1 try: thread.stack_size(4096) except ValueError: print 'caught expected ValueError setting stack_size(4096)' except thread.error: tss_supported = 0 print 'platform does not support changing thread stack size' if tss_supported: failed = lambda s, e: s != e fail_msg = "stack_size(%d) failed - should succeed" for tss in (262144, 0x100000, 0): thread.stack_size(tss) if failed(thread.stack_size(), tss): raise ValueError, fail_msg % tss print 'successfully set stack_size(%d)' % tss for tss in (262144, 0x100000): print 'trying stack_size = %d' % tss next_ident = 0 for i in range(numtasks): newtask() print 'waiting for all tasks to complete' done.acquire() print 'all tasks done' # reset stack size to default thread.stack_size(0) diff --git a/greentest/test_thread__boundedsem.py b/greentest/test_thread__boundedsem.py index 7cd00c6..bc5c0f7 100644 --- a/greentest/test_thread__boundedsem.py +++ b/greentest/test_thread__boundedsem.py @@ -1,11 +1,11 @@ """Test that BoundedSemaphore with a very high bound is as good as unbounded one""" -from eventlet import coros -from eventlet.green import thread +from eventlib import coros +from eventlib.green import thread def allocate_lock(): return coros.semaphore(1, 9999) thread.allocate_lock = allocate_lock thread.LockType = coros.BoundedSemaphore execfile('test_thread.py') diff --git a/greentest/test_threading.py b/greentest/test_threading.py index 08bf39d..ef75e1f 100644 --- a/greentest/test_threading.py +++ b/greentest/test_threading.py @@ -1,306 +1,306 @@ # Very rudimentary test of threading module import greentest.test_support from greentest.test_support import verbose import random import sys -from eventlet.green import threading -from eventlet.green import thread -from eventlet.green import time +from eventlib.green import threading +from eventlib.green import thread +from eventlib.green import time import unittest # A trivial mutable counter. class Counter(object): def __init__(self): self.value = 0 def inc(self): self.value += 1 def dec(self): self.value -= 1 def get(self): return self.value class TestThread(threading.Thread): def __init__(self, name, testcase, sema, mutex, nrunning): threading.Thread.__init__(self, name=name) self.testcase = testcase self.sema = sema self.mutex = mutex self.nrunning = nrunning def run(self): delay = random.random() * 0.1 if verbose: print 'task', self.getName(), 'will run for', delay, 'sec' self.sema.acquire() self.mutex.acquire() self.nrunning.inc() if verbose: print self.nrunning.get(), 'tasks are running' self.testcase.assert_(self.nrunning.get() <= 3) self.mutex.release() time.sleep(delay) if verbose: print 'task', self.getName(), 'done' self.mutex.acquire() self.nrunning.dec() self.testcase.assert_(self.nrunning.get() >= 0) if verbose: print self.getName(), 'is finished.', self.nrunning.get(), \ 'tasks are running' self.mutex.release() self.sema.release() class ThreadTests(unittest.TestCase): # Create a bunch of threads, let each do some work, wait until all are # done. def test_various_ops(self): # This takes about n/3 seconds to run (about n/3 clumps of tasks, # times about 1 second per clump). NUMTASKS = 10 # no more than 3 of the 10 can run at once sema = threading.BoundedSemaphore(value=3) mutex = threading.RLock() numrunning = Counter() threads = [] for i in range(NUMTASKS): t = TestThread(""%i, self, sema, mutex, numrunning) threads.append(t) t.start() if verbose: print 'waiting for all tasks to complete' for t in threads: t.join(NUMTASKS) self.assert_(not t.isAlive()) if verbose: print 'all tasks done' self.assertEqual(numrunning.get(), 0) # run with a small(ish) thread stack size (256kB) if hasattr(threading, 'stack_size'): def test_various_ops_small_stack(self): if verbose: print 'with 256kB thread stack size...' try: threading.stack_size(262144) except thread.error: if verbose: print 'platform does not support changing thread stack size' return self.test_various_ops() threading.stack_size(0) # run with a large thread stack size (1MB) def test_various_ops_large_stack(self): if verbose: print 'with 1MB thread stack size...' try: threading.stack_size(0x100000) except thread.error: if verbose: print 'platform does not support changing thread stack size' return self.test_various_ops() threading.stack_size(0) def test_foreign_thread(self): # Check that a "foreign" thread can use the threading module. def f(mutex): # Acquiring an RLock forces an entry for the foreign # thread to get made in the threading._active map. r = threading.RLock() r.acquire() r.release() mutex.release() mutex = threading.Lock() mutex.acquire() tid = thread.start_new_thread(f, (mutex,)) # Wait for the thread to finish. mutex.acquire() self.assert_(tid in threading._active) self.assert_(isinstance(threading._active[tid], threading._DummyThread)) del threading._active[tid] # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently) # exposed at the Python level. This test relies on ctypes to get at it. def test_PyThreadState_SetAsyncExc(self): try: import ctypes except ImportError: if verbose: print "test_PyThreadState_SetAsyncExc can't import ctypes" return # can't do anything set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc class AsyncExc(Exception): pass exception = ctypes.py_object(AsyncExc) # `worker_started` is set by the thread when it's inside a try/except # block waiting to catch the asynchronously set AsyncExc exception. # `worker_saw_exception` is set by the thread upon catching that # exception. worker_started = threading.Event() worker_saw_exception = threading.Event() class Worker(threading.Thread): def run(self): self.id = thread.get_ident() self.finished = False try: while True: worker_started.set() time.sleep(0.1) except AsyncExc: self.finished = True worker_saw_exception.set() t = Worker() t.setDaemon(True) # so if this fails, we don't hang Python at shutdown t.start() if verbose: print " started worker thread" # Try a thread id that doesn't make sense. if verbose: print " trying nonsensical thread id" result = set_async_exc(ctypes.c_long(-1), exception) self.assertEqual(result, 0) # no thread states modified # Now raise an exception in the worker thread. if verbose: print " waiting for worker thread to get started" worker_started.wait() if verbose: print " verifying worker hasn't exited" self.assert_(not t.finished) # if verbose: # print " attempting to raise asynch exception in worker" # result = set_async_exc(ctypes.c_long(t.id), exception) # self.assertEqual(result, 1) # one thread state modified # if verbose: # print " waiting for worker to say it caught the exception" # worker_saw_exception.wait(timeout=10) # self.assert_(t.finished) if verbose: print " all OK(2 disabled) -- joining worker" if t.finished: t.join() # else the thread is still running, and we have no way to kill it def test_enumerate_after_join(self): # Try hard to trigger #1703448: a thread is still returned in # threading.enumerate() after it has been join()ed. enum = threading.enumerate old_interval = sys.getcheckinterval() sys.setcheckinterval(1) try: for i in xrange(1, 1000): t = threading.Thread(target=lambda: None) t.start() t.join() l = enum() self.assertFalse(t in l, "#1703448 triggered after %d trials: %s" % (i, l)) finally: sys.setcheckinterval(old_interval) class ThreadJoinOnShutdown(unittest.TestCase): def _run_and_join(self, script): script = """if 1: import sys, os, time, threading # a thread, which waits for the main program to terminate def joiningfunc(mainthread): mainthread.join() print 'end of thread' \n""" + script import subprocess p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE) rc = p.wait() data = p.stdout.read().replace('\r', '') self.assertEqual(data, "end of main\nend of thread\n") self.failIf(rc == 2, "interpreter was blocked") self.failUnless(rc == 0, "Unexpected error") def test_1_join_on_shutdown(self): # The usual case: on exit, wait for a non-daemon thread script = """if 1: import os t = threading.Thread(target=joiningfunc, args=(threading.currentThread(),)) t.start() time.sleep(0.1) print 'end of main' """ self._run_and_join(script) def test_2_join_in_forked_process(self): # Like the test above, but from a forked interpreter import os if not hasattr(os, 'fork'): return script = """if 1: childpid = os.fork() if childpid != 0: os.waitpid(childpid, 0) sys.exit(0) t = threading.Thread(target=joiningfunc, args=(threading.currentThread(),)) t.start() print 'end of main' """ self._run_and_join(script) def test_3_join_in_forked_from_thread(self): # Like the test above, but fork() was called from a worker thread # In the forked process, the main Thread object must be marked as stopped. import os if not hasattr(os, 'fork'): return script = """if 1: main_thread = threading.currentThread() def worker(): childpid = os.fork() if childpid != 0: os.waitpid(childpid, 0) sys.exit(0) t = threading.Thread(target=joiningfunc, args=(main_thread,)) print 'end of main' t.start() t.join() # Should not block: main_thread is already stopped w = threading.Thread(target=worker) w.start() """ self._run_and_join(script) def test_main(): greentest.test_support.run_unittest(ThreadTests, ThreadJoinOnShutdown) if __name__ == "__main__": test_main() diff --git a/greentest/test_threading_local.py b/greentest/test_threading_local.py index f505459..bc19322 100644 --- a/greentest/test_threading_local.py +++ b/greentest/test_threading_local.py @@ -1,55 +1,55 @@ import unittest from doctest import DocTestSuite from greentest import test_support class ThreadingLocalTest(unittest.TestCase): def test_derived(self): # Issue 3088: if there is a threads switch inside the __init__ # of a threading.local derived class, the per-thread dictionary # is created but not correctly set on the object. # The first member set may be bogus. - from eventlet.green import threading - from eventlet.green import time + from eventlib.green import threading + from eventlib.green import time class Local(threading.local): def __init__(self): time.sleep(0.01) local = Local() def f(i): local.x = i # Simply check that the variable is correctly set self.assertEqual(local.x, i) threads= [] for i in range(10): t = threading.Thread(target=f, args=(i,)) t.start() threads.append(t) for t in threads: t.join() def test_main(): suite = DocTestSuite('_threading_local') try: - from eventlet.green.thread import _local + from eventlib.green.thread import _local except ImportError: pass else: import _threading_local local_orig = _threading_local.local def setUp(test): _threading_local.local = _local def tearDown(test): _threading_local.local = local_orig suite.addTest(DocTestSuite('_threading_local', setUp=setUp, tearDown=tearDown) ) suite.addTest(unittest.makeSuite(ThreadingLocalTest)) test_support.run_suite(suite) if __name__ == '__main__': test_main() diff --git a/greentest/test_timeout.py b/greentest/test_timeout.py index 982c443..53f1c6d 100644 --- a/greentest/test_timeout.py +++ b/greentest/test_timeout.py @@ -1,197 +1,197 @@ """Unit tests for socket timeout feature.""" import unittest from greentest import test_support # This requires the 'network' resource as given on the regrtest command line. skip_expected = not test_support.is_resource_enabled('network') -from eventlet.green import time -from eventlet.green import socket +from eventlib.green import time +from eventlib.green import socket class CreationTestCase(unittest.TestCase): """Test case for socket.gettimeout() and socket.settimeout()""" def setUp(self): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def tearDown(self): self.sock.close() def testObjectCreation(self): # Test Socket creation self.assertEqual(self.sock.gettimeout(), None, "timeout not disabled by default") def testFloatReturnValue(self): # Test return value of gettimeout() self.sock.settimeout(7.345) self.assertEqual(self.sock.gettimeout(), 7.345) self.sock.settimeout(3) self.assertEqual(self.sock.gettimeout(), 3) self.sock.settimeout(None) self.assertEqual(self.sock.gettimeout(), None) def testReturnType(self): # Test return type of gettimeout() self.sock.settimeout(1) self.assertEqual(type(self.sock.gettimeout()), type(1.0)) self.sock.settimeout(3.9) self.assertEqual(type(self.sock.gettimeout()), type(1.0)) def testTypeCheck(self): # Test type checking by settimeout() self.sock.settimeout(0) self.sock.settimeout(0L) self.sock.settimeout(0.0) self.sock.settimeout(None) self.assertRaises(TypeError, self.sock.settimeout, "") self.assertRaises(TypeError, self.sock.settimeout, u"") self.assertRaises(TypeError, self.sock.settimeout, ()) self.assertRaises(TypeError, self.sock.settimeout, []) self.assertRaises(TypeError, self.sock.settimeout, {}) self.assertRaises(TypeError, self.sock.settimeout, 0j) def testRangeCheck(self): # Test range checking by settimeout() self.assertRaises(ValueError, self.sock.settimeout, -1) self.assertRaises(ValueError, self.sock.settimeout, -1L) self.assertRaises(ValueError, self.sock.settimeout, -1.0) def testTimeoutThenBlocking(self): # Test settimeout() followed by setblocking() self.sock.settimeout(10) self.sock.setblocking(1) self.assertEqual(self.sock.gettimeout(), None) self.sock.setblocking(0) self.assertEqual(self.sock.gettimeout(), 0.0) self.sock.settimeout(10) self.sock.setblocking(0) self.assertEqual(self.sock.gettimeout(), 0.0) self.sock.setblocking(1) self.assertEqual(self.sock.gettimeout(), None) def testBlockingThenTimeout(self): # Test setblocking() followed by settimeout() self.sock.setblocking(0) self.sock.settimeout(1) self.assertEqual(self.sock.gettimeout(), 1) self.sock.setblocking(1) self.sock.settimeout(1) self.assertEqual(self.sock.gettimeout(), 1) class TimeoutTestCase(unittest.TestCase): """Test case for socket.socket() timeout functions""" # There are a number of tests here trying to make sure that an operation # doesn't take too much longer than expected. But competing machine # activity makes it inevitable that such tests will fail at times. # When fuzz was at 1.0, I (tim) routinely saw bogus failures on Win2K # and Win98SE. Boosting it to 2.0 helped a lot, but isn't a real # solution. fuzz = 2.0 def setUp(self): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.addr_remote = ('www.python.org.', 80) self.addr_local = ('127.0.0.1', 25339) def tearDown(self): self.sock.close() def testConnectTimeout(self): # Test connect() timeout _timeout = 0.001 self.sock.settimeout(_timeout) # If we are too close to www.python.org, this test will fail. # Pick a host that should be farther away. if (socket.getfqdn().split('.')[-2:] == ['python', 'org'] or socket.getfqdn().split('.')[-2:-1] == ['xs4all']): self.addr_remote = ('tut.fi', 80) _t1 = time.time() self.failUnlessRaises(socket.error, self.sock.connect, self.addr_remote) _t2 = time.time() _delta = abs(_t1 - _t2) self.assert_(_delta < _timeout + self.fuzz, "timeout (%g) is more than %g seconds more than expected (%g)" %(_delta, self.fuzz, _timeout)) def testRecvTimeout(self): # Test recv() timeout _timeout = 0.02 self.sock.connect(self.addr_remote) self.sock.settimeout(_timeout) _t1 = time.time() self.failUnlessRaises(socket.error, self.sock.recv, 1024) _t2 = time.time() _delta = abs(_t1 - _t2) self.assert_(_delta < _timeout + self.fuzz, "timeout (%g) is %g seconds more than expected (%g)" %(_delta, self.fuzz, _timeout)) def testAcceptTimeout(self): # Test accept() timeout _timeout = 2 self.sock.settimeout(_timeout) self.sock.bind(self.addr_local) self.sock.listen(5) _t1 = time.time() self.failUnlessRaises(socket.error, self.sock.accept) _t2 = time.time() _delta = abs(_t1 - _t2) self.assert_(_delta < _timeout + self.fuzz, "timeout (%g) is %g seconds more than expected (%g)" %(_delta, self.fuzz, _timeout)) def testRecvfromTimeout(self): # Test recvfrom() timeout _timeout = 2 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock.settimeout(_timeout) self.sock.bind(self.addr_local) _t1 = time.time() self.failUnlessRaises(socket.error, self.sock.recvfrom, 8192) _t2 = time.time() _delta = abs(_t1 - _t2) self.assert_(_delta < _timeout + self.fuzz, "timeout (%g) is %g seconds more than expected (%g)" %(_delta, self.fuzz, _timeout)) def testSend(self): # Test send() timeout # couldn't figure out how to test it pass def testSendto(self): # Test sendto() timeout # couldn't figure out how to test it pass def testSendall(self): # Test sendall() timeout # couldn't figure out how to test it pass def test_main(): test_support.requires('network') test_support.run_unittest(CreationTestCase, TimeoutTestCase) if __name__ == "__main__": test_main() diff --git a/greentest/test_urllib2.py b/greentest/test_urllib2.py index 77f0a03..52d6e5d 100644 --- a/greentest/test_urllib2.py +++ b/greentest/test_urllib2.py @@ -1,1064 +1,1064 @@ import unittest from greentest import test_support import os -from eventlet.green import socket +from eventlib.green import socket import StringIO -from eventlet.green import urllib2 -from eventlet.green.urllib2 import Request, OpenerDirector +from eventlib.green import urllib2 +from eventlib.green.urllib2 import Request, OpenerDirector # XXX # Request # CacheFTPHandler (hard to write) # parse_keqv_list, parse_http_list, HTTPDigestAuthHandler class TrivialTests(unittest.TestCase): def test_trivial(self): # A couple trivial tests self.assertRaises(ValueError, urllib2.urlopen, 'bogus url') # XXX Name hacking to get this to work on Windows. fname = os.path.abspath(urllib2.__file__).replace('\\', '/') if fname[1:2] == ":": fname = fname[2:] # And more hacking to get it to work on MacOS. This assumes # urllib.pathname2url works, unfortunately... if os.name == 'mac': fname = '/' + fname.replace(':', '/') elif os.name == 'riscos': import string fname = os.expand(fname) fname = fname.translate(string.maketrans("/.", "./")) file_url = "file://%s" % fname f = urllib2.urlopen(file_url) buf = f.read() f.close() def test_parse_http_list(self): tests = [('a,b,c', ['a', 'b', 'c']), ('path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']), ('a, b, "c", "d", "e,f", g, h', ['a', 'b', '"c"', '"d"', '"e,f"', 'g', 'h']), ('a="b\\"c", d="e\\,f", g="h\\\\i"', ['a="b"c"', 'd="e,f"', 'g="h\\i"'])] for string, list in tests: self.assertEquals(urllib2.parse_http_list(string), list) def test_request_headers_dict(): """ The Request.headers dictionary is not a documented interface. It should stay that way, because the complete set of headers are only accessible through the .get_header(), .has_header(), .header_items() interface. However, .headers pre-dates those methods, and so real code will be using the dictionary. The introduction in 2.4 of those methods was a mistake for the same reason: code that previously saw all (urllib2 user)-provided headers in .headers now sees only a subset (and the function interface is ugly and incomplete). A better change would have been to replace .headers dict with a dict subclass (or UserDict.DictMixin instance?) that preserved the .headers interface and also provided access to the "unredirected" headers. It's probably too late to fix that, though. Check .capitalize() case normalization: >>> url = "http://example.com" >>> Request(url, headers={"Spam-eggs": "blah"}).headers["Spam-eggs"] 'blah' >>> Request(url, headers={"spam-EggS": "blah"}).headers["Spam-eggs"] 'blah' Currently, Request(url, "Spam-eggs").headers["Spam-Eggs"] raises KeyError, but that could be changed in future. """ def test_request_headers_methods(): """ Note the case normalization of header names here, to .capitalize()-case. This should be preserved for backwards-compatibility. (In the HTTP case, normalization to .title()-case is done by urllib2 before sending headers to httplib). >>> url = "http://example.com" >>> r = Request(url, headers={"Spam-eggs": "blah"}) >>> r.has_header("Spam-eggs") True >>> r.header_items() [('Spam-eggs', 'blah')] >>> r.add_header("Foo-Bar", "baz") >>> items = r.header_items() >>> items.sort() >>> items [('Foo-bar', 'baz'), ('Spam-eggs', 'blah')] Note that e.g. r.has_header("spam-EggS") is currently False, and r.get_header("spam-EggS") returns None, but that could be changed in future. >>> r.has_header("Not-there") False >>> print r.get_header("Not-there") None >>> r.get_header("Not-there", "default") 'default' """ def test_password_manager(self): """ >>> mgr = urllib2.HTTPPasswordMgr() >>> add = mgr.add_password >>> add("Some Realm", "http://example.com/", "joe", "password") >>> add("Some Realm", "http://example.com/ni", "ni", "ni") >>> add("c", "http://example.com/foo", "foo", "ni") >>> add("c", "http://example.com/bar", "bar", "nini") >>> add("b", "http://example.com/", "first", "blah") >>> add("b", "http://example.com/", "second", "spam") >>> add("a", "http://example.com", "1", "a") >>> add("Some Realm", "http://c.example.com:3128", "3", "c") >>> add("Some Realm", "d.example.com", "4", "d") >>> add("Some Realm", "e.example.com:3128", "5", "e") >>> mgr.find_user_password("Some Realm", "example.com") ('joe', 'password') >>> mgr.find_user_password("Some Realm", "http://example.com") ('joe', 'password') >>> mgr.find_user_password("Some Realm", "http://example.com/") ('joe', 'password') >>> mgr.find_user_password("Some Realm", "http://example.com/spam") ('joe', 'password') >>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam") ('joe', 'password') >>> mgr.find_user_password("c", "http://example.com/foo") ('foo', 'ni') >>> mgr.find_user_password("c", "http://example.com/bar") ('bar', 'nini') Actually, this is really undefined ATM ## Currently, we use the highest-level path where more than one match: ## >>> mgr.find_user_password("Some Realm", "http://example.com/ni") ## ('joe', 'password') Use latest add_password() in case of conflict: >>> mgr.find_user_password("b", "http://example.com/") ('second', 'spam') No special relationship between a.example.com and example.com: >>> mgr.find_user_password("a", "http://example.com/") ('1', 'a') >>> mgr.find_user_password("a", "http://a.example.com/") (None, None) Ports: >>> mgr.find_user_password("Some Realm", "c.example.com") (None, None) >>> mgr.find_user_password("Some Realm", "c.example.com:3128") ('3', 'c') >>> mgr.find_user_password("Some Realm", "http://c.example.com:3128") ('3', 'c') >>> mgr.find_user_password("Some Realm", "d.example.com") ('4', 'd') >>> mgr.find_user_password("Some Realm", "e.example.com:3128") ('5', 'e') """ pass def test_password_manager_default_port(self): """ >>> mgr = urllib2.HTTPPasswordMgr() >>> add = mgr.add_password The point to note here is that we can't guess the default port if there's no scheme. This applies to both add_password and find_user_password. >>> add("f", "http://g.example.com:80", "10", "j") >>> add("g", "http://h.example.com", "11", "k") >>> add("h", "i.example.com:80", "12", "l") >>> add("i", "j.example.com", "13", "m") >>> mgr.find_user_password("f", "g.example.com:100") (None, None) >>> mgr.find_user_password("f", "g.example.com:80") ('10', 'j') >>> mgr.find_user_password("f", "g.example.com") (None, None) >>> mgr.find_user_password("f", "http://g.example.com:100") (None, None) >>> mgr.find_user_password("f", "http://g.example.com:80") ('10', 'j') >>> mgr.find_user_password("f", "http://g.example.com") ('10', 'j') >>> mgr.find_user_password("g", "h.example.com") ('11', 'k') >>> mgr.find_user_password("g", "h.example.com:80") ('11', 'k') >>> mgr.find_user_password("g", "http://h.example.com:80") ('11', 'k') >>> mgr.find_user_password("h", "i.example.com") (None, None) >>> mgr.find_user_password("h", "i.example.com:80") ('12', 'l') >>> mgr.find_user_password("h", "http://i.example.com:80") ('12', 'l') >>> mgr.find_user_password("i", "j.example.com") ('13', 'm') >>> mgr.find_user_password("i", "j.example.com:80") (None, None) >>> mgr.find_user_password("i", "http://j.example.com") ('13', 'm') >>> mgr.find_user_password("i", "http://j.example.com:80") (None, None) """ class MockOpener: addheaders = [] def open(self, req, data=None): self.req, self.data = req, data def error(self, proto, *args): self.proto, self.args = proto, args class MockFile: def read(self, count=None): pass def readline(self, count=None): pass def close(self): pass class MockHeaders(dict): def getheaders(self, name): return self.values() class MockResponse(StringIO.StringIO): def __init__(self, code, msg, headers, data, url=None): StringIO.StringIO.__init__(self, data) self.code, self.msg, self.headers, self.url = code, msg, headers, url def info(self): return self.headers def geturl(self): return self.url class MockCookieJar: def add_cookie_header(self, request): self.ach_req = request def extract_cookies(self, response, request): self.ec_req, self.ec_r = request, response class FakeMethod: def __init__(self, meth_name, action, handle): self.meth_name = meth_name self.handle = handle self.action = action def __call__(self, *args): return self.handle(self.meth_name, self.action, *args) class MockHandler: # useful for testing handler machinery # see add_ordered_mock_handlers() docstring handler_order = 500 def __init__(self, methods): self._define_methods(methods) def _define_methods(self, methods): for spec in methods: if len(spec) == 2: name, action = spec else: name, action = spec, None meth = FakeMethod(name, action, self.handle) setattr(self.__class__, name, meth) def handle(self, fn_name, action, *args, **kwds): self.parent.calls.append((self, fn_name, args, kwds)) if action is None: return None elif action == "return self": return self elif action == "return response": res = MockResponse(200, "OK", {}, "") return res elif action == "return request": return Request("http://blah/") elif action.startswith("error"): code = action[action.rfind(" ")+1:] try: code = int(code) except ValueError: pass res = MockResponse(200, "OK", {}, "") return self.parent.error("http", args[0], res, code, "", {}) elif action == "raise": raise urllib2.URLError("blah") assert False def close(self): pass def add_parent(self, parent): self.parent = parent self.parent.calls = [] def __lt__(self, other): if not hasattr(other, "handler_order"): # No handler_order, leave in original order. Yuck. return True return self.handler_order < other.handler_order def add_ordered_mock_handlers(opener, meth_spec): """Create MockHandlers and add them to an OpenerDirector. meth_spec: list of lists of tuples and strings defining methods to define on handlers. eg: [["http_error", "ftp_open"], ["http_open"]] defines methods .http_error() and .ftp_open() on one handler, and .http_open() on another. These methods just record their arguments and return None. Using a tuple instead of a string causes the method to perform some action (see MockHandler.handle()), eg: [["http_error"], [("http_open", "return request")]] defines .http_error() on one handler (which simply returns None), and .http_open() on another handler, which returns a Request object. """ handlers = [] count = 0 for meths in meth_spec: class MockHandlerSubclass(MockHandler): pass h = MockHandlerSubclass(meths) h.handler_order += count h.add_parent(opener) count = count + 1 handlers.append(h) opener.add_handler(h) return handlers def build_test_opener(*handler_instances): opener = OpenerDirector() for h in handler_instances: opener.add_handler(h) return opener class MockHTTPHandler(urllib2.BaseHandler): # useful for testing redirections and auth # sends supplied headers and code as first response # sends 200 OK as second response def __init__(self, code, headers): self.code = code self.headers = headers self.reset() def reset(self): self._count = 0 self.requests = [] def http_open(self, req): import mimetools, httplib, copy from StringIO import StringIO self.requests.append(copy.deepcopy(req)) if self._count == 0: self._count = self._count + 1 name = httplib.responses[self.code] msg = mimetools.Message(StringIO(self.headers)) return self.parent.error( "http", req, MockFile(), self.code, name, msg) else: self.req = req msg = mimetools.Message(StringIO("\r\n\r\n")) return MockResponse(200, "OK", msg, "", req.get_full_url()) class MockPasswordManager: def add_password(self, realm, uri, user, password): self.realm = realm self.url = uri self.user = user self.password = password def find_user_password(self, realm, authuri): self.target_realm = realm self.target_url = authuri return self.user, self.password class OpenerDirectorTests(unittest.TestCase): def test_add_non_handler(self): class NonHandler(object): pass self.assertRaises(TypeError, OpenerDirector().add_handler, NonHandler()) def test_badly_named_methods(self): # test work-around for three methods that accidentally follow the # naming conventions for handler methods # (*_open() / *_request() / *_response()) # These used to call the accidentally-named methods, causing a # TypeError in real code; here, returning self from these mock # methods would either cause no exception, or AttributeError. from urllib2 import URLError o = OpenerDirector() meth_spec = [ [("do_open", "return self"), ("proxy_open", "return self")], [("redirect_request", "return self")], ] handlers = add_ordered_mock_handlers(o, meth_spec) o.add_handler(urllib2.UnknownHandler()) for scheme in "do", "proxy", "redirect": self.assertRaises(URLError, o.open, scheme+"://example.com/") def test_handled(self): # handler returning non-None means no more handlers will be called o = OpenerDirector() meth_spec = [ ["http_open", "ftp_open", "http_error_302"], ["ftp_open"], [("http_open", "return self")], [("http_open", "return self")], ] handlers = add_ordered_mock_handlers(o, meth_spec) req = Request("http://example.com/") r = o.open(req) # Second .http_open() gets called, third doesn't, since second returned # non-None. Handlers without .http_open() never get any methods called # on them. # In fact, second mock handler defining .http_open() returns self # (instead of response), which becomes the OpenerDirector's return # value. self.assertEqual(r, handlers[2]) calls = [(handlers[0], "http_open"), (handlers[2], "http_open")] for expected, got in zip(calls, o.calls): handler, name, args, kwds = got self.assertEqual((handler, name), expected) self.assertEqual(args, (req,)) def test_handler_order(self): o = OpenerDirector() handlers = [] for meths, handler_order in [ ([("http_open", "return self")], 500), (["http_open"], 0), ]: class MockHandlerSubclass(MockHandler): pass h = MockHandlerSubclass(meths) h.handler_order = handler_order handlers.append(h) o.add_handler(h) r = o.open("http://example.com/") # handlers called in reverse order, thanks to their sort order self.assertEqual(o.calls[0][0], handlers[1]) self.assertEqual(o.calls[1][0], handlers[0]) def test_raise(self): # raising URLError stops processing of request o = OpenerDirector() meth_spec = [ [("http_open", "raise")], [("http_open", "return self")], ] handlers = add_ordered_mock_handlers(o, meth_spec) req = Request("http://example.com/") self.assertRaises(urllib2.URLError, o.open, req) self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})]) ## def test_error(self): ## # XXX this doesn't actually seem to be used in standard library, ## # but should really be tested anyway... def test_http_error(self): # XXX http_error_default # http errors are a special case o = OpenerDirector() meth_spec = [ [("http_open", "error 302")], [("http_error_400", "raise"), "http_open"], [("http_error_302", "return response"), "http_error_303", "http_error"], [("http_error_302")], ] handlers = add_ordered_mock_handlers(o, meth_spec) class Unknown: def __eq__(self, other): return True req = Request("http://example.com/") r = o.open(req) assert len(o.calls) == 2 calls = [(handlers[0], "http_open", (req,)), (handlers[2], "http_error_302", (req, Unknown(), 302, "", {}))] for expected, got in zip(calls, o.calls): handler, method_name, args = expected self.assertEqual((handler, method_name), got[:2]) self.assertEqual(args, got[2]) def test_processors(self): # *_request / *_response methods get called appropriately o = OpenerDirector() meth_spec = [ [("http_request", "return request"), ("http_response", "return response")], [("http_request", "return request"), ("http_response", "return response")], ] handlers = add_ordered_mock_handlers(o, meth_spec) req = Request("http://example.com/") r = o.open(req) # processor methods are called on *all* handlers that define them, # not just the first handler that handles the request calls = [ (handlers[0], "http_request"), (handlers[1], "http_request"), (handlers[0], "http_response"), (handlers[1], "http_response")] for i, (handler, name, args, kwds) in enumerate(o.calls): if i < 2: # *_request self.assertEqual((handler, name), calls[i]) self.assertEqual(len(args), 1) self.assert_(isinstance(args[0], Request)) else: # *_response self.assertEqual((handler, name), calls[i]) self.assertEqual(len(args), 2) self.assert_(isinstance(args[0], Request)) # response from opener.open is None, because there's no # handler that defines http_open to handle it self.assert_(args[1] is None or isinstance(args[1], MockResponse)) def sanepathname2url(path): import urllib urlpath = urllib.pathname2url(path) if os.name == "nt" and urlpath.startswith("///"): urlpath = urlpath[2:] # XXX don't ask me about the mac... return urlpath class HandlerTests(unittest.TestCase): def test_ftp(self): class MockFTPWrapper: def __init__(self, data): self.data = data def retrfile(self, filename, filetype): self.filename, self.filetype = filename, filetype return StringIO.StringIO(self.data), len(self.data) class NullFTPHandler(urllib2.FTPHandler): def __init__(self, data): self.data = data def connect_ftp(self, user, passwd, host, port, dirs): self.user, self.passwd = user, passwd self.host, self.port = host, port self.dirs = dirs self.ftpwrapper = MockFTPWrapper(self.data) return self.ftpwrapper import ftplib, socket data = "rheum rhaponicum" h = NullFTPHandler(data) o = h.parent = MockOpener() for url, host, port, type_, dirs, filename, mimetype in [ ("ftp://localhost/foo/bar/baz.html", "localhost", ftplib.FTP_PORT, "I", ["foo", "bar"], "baz.html", "text/html"), ("ftp://localhost:80/foo/bar/", "localhost", 80, "D", ["foo", "bar"], "", None), ("ftp://localhost/baz.gif;type=a", "localhost", ftplib.FTP_PORT, "A", [], "baz.gif", None), # XXX really this should guess image/gif ]: r = h.ftp_open(Request(url)) # ftp authentication not yet implemented by FTPHandler self.assert_(h.user == h.passwd == "") self.assertEqual(h.host, socket.gethostbyname(host)) self.assertEqual(h.port, port) self.assertEqual(h.dirs, dirs) self.assertEqual(h.ftpwrapper.filename, filename) self.assertEqual(h.ftpwrapper.filetype, type_) headers = r.info() self.assertEqual(headers.get("Content-type"), mimetype) self.assertEqual(int(headers["Content-length"]), len(data)) def test_file(self): import rfc822 h = urllib2.FileHandler() o = h.parent = MockOpener() TESTFN = test_support.TESTFN urlpath = sanepathname2url(os.path.abspath(TESTFN)) towrite = "hello, world\n" urls = [ "file://localhost%s" % urlpath, "file://%s" % urlpath, "file://%s%s" % (socket.gethostbyname('localhost'), urlpath), ] try: localaddr = socket.gethostbyname(socket.gethostname()) except socket.gaierror: localaddr = '' if localaddr: urls.append("file://%s%s" % (localaddr, urlpath)) for url in urls: f = open(TESTFN, "wb") try: try: f.write(towrite) finally: f.close() r = h.file_open(Request(url)) try: data = r.read() headers = r.info() newurl = r.geturl() finally: r.close() stats = os.stat(TESTFN) modified = rfc822.formatdate(stats.st_mtime) finally: os.remove(TESTFN) self.assertEqual(data, towrite) self.assertEqual(headers["Content-type"], "text/plain") self.assertEqual(headers["Content-length"], "13") self.assertEqual(headers["Last-modified"], modified) for url in [ "file://localhost:80%s" % urlpath, # XXXX bug: these fail with socket.gaierror, should be URLError ## "file://%s:80%s/%s" % (socket.gethostbyname('localhost'), ## os.getcwd(), TESTFN), ## "file://somerandomhost.ontheinternet.com%s/%s" % ## (os.getcwd(), TESTFN), ]: try: f = open(TESTFN, "wb") try: f.write(towrite) finally: f.close() self.assertRaises(urllib2.URLError, h.file_open, Request(url)) finally: os.remove(TESTFN) h = urllib2.FileHandler() o = h.parent = MockOpener() # XXXX why does // mean ftp (and /// mean not ftp!), and where # is file: scheme specified? I think this is really a bug, and # what was intended was to distinguish between URLs like: # file:/blah.txt (a file) # file://localhost/blah.txt (a file) # file:///blah.txt (a file) # file://ftp.example.com/blah.txt (an ftp URL) for url, ftp in [ ("file://ftp.example.com//foo.txt", True), ("file://ftp.example.com///foo.txt", False), # XXXX bug: fails with OSError, should be URLError ("file://ftp.example.com/foo.txt", False), ]: req = Request(url) try: h.file_open(req) # XXXX remove OSError when bug fixed except (urllib2.URLError, OSError): self.assert_(not ftp) else: self.assert_(o.req is req) self.assertEqual(req.type, "ftp") def test_http(self): class MockHTTPResponse: def __init__(self, fp, msg, status, reason): self.fp = fp self.msg = msg self.status = status self.reason = reason def read(self): return '' class MockHTTPClass: def __init__(self): self.req_headers = [] self.data = None self.raise_on_endheaders = False def __call__(self, host): self.host = host return self def set_debuglevel(self, level): self.level = level def request(self, method, url, body=None, headers={}): self.method = method self.selector = url self.req_headers += headers.items() self.req_headers.sort() if body: self.data = body if self.raise_on_endheaders: import socket raise socket.error() def getresponse(self): return MockHTTPResponse(MockFile(), {}, 200, "OK") h = urllib2.AbstractHTTPHandler() o = h.parent = MockOpener() url = "http://example.com/" for method, data in [("GET", None), ("POST", "blah")]: req = Request(url, data, {"Foo": "bar"}) req.add_unredirected_header("Spam", "eggs") http = MockHTTPClass() r = h.do_open(http, req) # result attributes r.read; r.readline # wrapped MockFile methods r.info; r.geturl # addinfourl methods r.code, r.msg == 200, "OK" # added from MockHTTPClass.getreply() hdrs = r.info() hdrs.get; hdrs.has_key # r.info() gives dict from .getreply() self.assertEqual(r.geturl(), url) self.assertEqual(http.host, "example.com") self.assertEqual(http.level, 0) self.assertEqual(http.method, method) self.assertEqual(http.selector, "/") self.assertEqual(http.req_headers, [("Connection", "close"), ("Foo", "bar"), ("Spam", "eggs")]) self.assertEqual(http.data, data) # check socket.error converted to URLError http.raise_on_endheaders = True self.assertRaises(urllib2.URLError, h.do_open, http, req) # check adding of standard headers o.addheaders = [("Spam", "eggs")] for data in "", None: # POST, GET req = Request("http://example.com/", data) r = MockResponse(200, "OK", {}, "") newreq = h.do_request_(req) if data is None: # GET self.assert_("Content-length" not in req.unredirected_hdrs) self.assert_("Content-type" not in req.unredirected_hdrs) else: # POST self.assertEqual(req.unredirected_hdrs["Content-length"], "0") self.assertEqual(req.unredirected_hdrs["Content-type"], "application/x-www-form-urlencoded") # XXX the details of Host could be better tested self.assertEqual(req.unredirected_hdrs["Host"], "example.com") self.assertEqual(req.unredirected_hdrs["Spam"], "eggs") # don't clobber existing headers req.add_unredirected_header("Content-length", "foo") req.add_unredirected_header("Content-type", "bar") req.add_unredirected_header("Host", "baz") req.add_unredirected_header("Spam", "foo") newreq = h.do_request_(req) self.assertEqual(req.unredirected_hdrs["Content-length"], "foo") self.assertEqual(req.unredirected_hdrs["Content-type"], "bar") self.assertEqual(req.unredirected_hdrs["Host"], "baz") self.assertEqual(req.unredirected_hdrs["Spam"], "foo") def test_errors(self): h = urllib2.HTTPErrorProcessor() o = h.parent = MockOpener() url = "http://example.com/" req = Request(url) # 200 OK is passed through r = MockResponse(200, "OK", {}, "", url) newr = h.http_response(req, r) self.assert_(r is newr) self.assert_(not hasattr(o, "proto")) # o.error not called # anything else calls o.error (and MockOpener returns None, here) r = MockResponse(201, "Created", {}, "", url) self.assert_(h.http_response(req, r) is None) self.assertEqual(o.proto, "http") # o.error called self.assertEqual(o.args, (req, r, 201, "Created", {})) def test_cookies(self): cj = MockCookieJar() h = urllib2.HTTPCookieProcessor(cj) o = h.parent = MockOpener() req = Request("http://example.com/") r = MockResponse(200, "OK", {}, "") newreq = h.http_request(req) self.assert_(cj.ach_req is req is newreq) self.assertEquals(req.get_origin_req_host(), "example.com") self.assert_(not req.is_unverifiable()) newr = h.http_response(req, r) self.assert_(cj.ec_req is req) self.assert_(cj.ec_r is r is newr) def test_redirect(self): from_url = "http://example.com/a.html" to_url = "http://example.com/b.html" h = urllib2.HTTPRedirectHandler() o = h.parent = MockOpener() # ordinary redirect behaviour for code in 301, 302, 303, 307: for data in None, "blah\nblah\n": method = getattr(h, "http_error_%s" % code) req = Request(from_url, data) req.add_header("Nonsense", "viking=withhold") req.add_unredirected_header("Spam", "spam") try: method(req, MockFile(), code, "Blah", MockHeaders({"location": to_url})) except urllib2.HTTPError: # 307 in response to POST requires user OK self.assert_(code == 307 and data is not None) self.assertEqual(o.req.get_full_url(), to_url) try: self.assertEqual(o.req.get_method(), "GET") except AttributeError: self.assert_(not o.req.has_data()) self.assertEqual(o.req.headers["Nonsense"], "viking=withhold") self.assert_("Spam" not in o.req.headers) self.assert_("Spam" not in o.req.unredirected_hdrs) # loop detection req = Request(from_url) def redirect(h, req, url=to_url): h.http_error_302(req, MockFile(), 302, "Blah", MockHeaders({"location": url})) # Note that the *original* request shares the same record of # redirections with the sub-requests caused by the redirections. # detect infinite loop redirect of a URL to itself req = Request(from_url, origin_req_host="example.com") count = 0 try: while 1: redirect(h, req, "http://example.com/") count = count + 1 except urllib2.HTTPError: # don't stop until max_repeats, because cookies may introduce state self.assertEqual(count, urllib2.HTTPRedirectHandler.max_repeats) # detect endless non-repeating chain of redirects req = Request(from_url, origin_req_host="example.com") count = 0 try: while 1: redirect(h, req, "http://example.com/%d" % count) count = count + 1 except urllib2.HTTPError: self.assertEqual(count, urllib2.HTTPRedirectHandler.max_redirections) # don't want to add test_cookielib here def dont_test_cookie_redirect(self): # cookies shouldn't leak into redirected requests from cookielib import CookieJar from test.test_cookielib import interact_netscape cj = CookieJar() interact_netscape(cj, "http://www.example.com/", "spam=eggs") hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n") hdeh = urllib2.HTTPDefaultErrorHandler() hrh = urllib2.HTTPRedirectHandler() cp = urllib2.HTTPCookieProcessor(cj) o = build_test_opener(hh, hdeh, hrh, cp) o.open("http://www.example.com/") self.assert_(not hh.req.has_header("Cookie")) def test_proxy(self): o = OpenerDirector() ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128")) o.add_handler(ph) meth_spec = [ [("http_open", "return response")] ] handlers = add_ordered_mock_handlers(o, meth_spec) req = Request("http://acme.example.com/") self.assertEqual(req.get_host(), "acme.example.com") r = o.open(req) self.assertEqual(req.get_host(), "proxy.example.com:3128") self.assertEqual([(handlers[0], "http_open")], [tup[0:2] for tup in o.calls]) def test_basic_auth(self): opener = OpenerDirector() password_manager = MockPasswordManager() auth_handler = urllib2.HTTPBasicAuthHandler(password_manager) realm = "ACME Widget Store" http_handler = MockHTTPHandler( 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm) opener.add_handler(auth_handler) opener.add_handler(http_handler) self._test_basic_auth(opener, auth_handler, "Authorization", realm, http_handler, password_manager, "http://acme.example.com/protected", "http://acme.example.com/protected", ) def test_proxy_basic_auth(self): opener = OpenerDirector() ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128")) opener.add_handler(ph) password_manager = MockPasswordManager() auth_handler = urllib2.ProxyBasicAuthHandler(password_manager) realm = "ACME Networks" http_handler = MockHTTPHandler( 407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm) opener.add_handler(auth_handler) opener.add_handler(http_handler) self._test_basic_auth(opener, auth_handler, "Proxy-authorization", realm, http_handler, password_manager, "http://acme.example.com:3128/protected", "proxy.example.com:3128", ) def test_basic_and_digest_auth_handlers(self): # HTTPDigestAuthHandler threw an exception if it couldn't handle a 40* # response (http://python.org/sf/1479302), where it should instead # return None to allow another handler (especially # HTTPBasicAuthHandler) to handle the response. # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must # try digest first (since it's the strongest auth scheme), so we record # order of calls here to check digest comes first: class RecordingOpenerDirector(OpenerDirector): def __init__(self): OpenerDirector.__init__(self) self.recorded = [] def record(self, info): self.recorded.append(info) class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler): def http_error_401(self, *args, **kwds): self.parent.record("digest") urllib2.HTTPDigestAuthHandler.http_error_401(self, *args, **kwds) class TestBasicAuthHandler(urllib2.HTTPBasicAuthHandler): def http_error_401(self, *args, **kwds): self.parent.record("basic") urllib2.HTTPBasicAuthHandler.http_error_401(self, *args, **kwds) opener = RecordingOpenerDirector() password_manager = MockPasswordManager() digest_handler = TestDigestAuthHandler(password_manager) basic_handler = TestBasicAuthHandler(password_manager) realm = "ACME Networks" http_handler = MockHTTPHandler( 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm) opener.add_handler(basic_handler) opener.add_handler(digest_handler) opener.add_handler(http_handler) # check basic auth isn't blocked by digest handler failing self._test_basic_auth(opener, basic_handler, "Authorization", realm, http_handler, password_manager, "http://acme.example.com/protected", "http://acme.example.com/protected", ) # check digest was tried before basic (twice, because # _test_basic_auth called .open() twice) self.assertEqual(opener.recorded, ["digest", "basic"]*2) def _test_basic_auth(self, opener, auth_handler, auth_header, realm, http_handler, password_manager, request_url, protected_url): import base64, httplib user, password = "wile", "coyote" # .add_password() fed through to password manager auth_handler.add_password(realm, request_url, user, password) self.assertEqual(realm, password_manager.realm) self.assertEqual(request_url, password_manager.url) self.assertEqual(user, password_manager.user) self.assertEqual(password, password_manager.password) r = opener.open(request_url) # should have asked the password manager for the username/password self.assertEqual(password_manager.target_realm, realm) self.assertEqual(password_manager.target_url, protected_url) # expect one request without authorization, then one with self.assertEqual(len(http_handler.requests), 2) self.assertFalse(http_handler.requests[0].has_header(auth_header)) userpass = '%s:%s' % (user, password) auth_hdr_value = 'Basic '+base64.encodestring(userpass).strip() self.assertEqual(http_handler.requests[1].get_header(auth_header), auth_hdr_value) # if the password manager can't find a password, the handler won't # handle the HTTP auth error password_manager.user = password_manager.password = None http_handler.reset() r = opener.open(request_url) self.assertEqual(len(http_handler.requests), 1) self.assertFalse(http_handler.requests[0].has_header(auth_header)) class MiscTests(unittest.TestCase): def test_build_opener(self): class MyHTTPHandler(urllib2.HTTPHandler): pass class FooHandler(urllib2.BaseHandler): def foo_open(self): pass class BarHandler(urllib2.BaseHandler): def bar_open(self): pass build_opener = urllib2.build_opener o = build_opener(FooHandler, BarHandler) self.opener_has_handler(o, FooHandler) self.opener_has_handler(o, BarHandler) # can take a mix of classes and instances o = build_opener(FooHandler, BarHandler()) self.opener_has_handler(o, FooHandler) self.opener_has_handler(o, BarHandler) # subclasses of default handlers override default handlers o = build_opener(MyHTTPHandler) self.opener_has_handler(o, MyHTTPHandler) # a particular case of overriding: default handlers can be passed # in explicitly o = build_opener() self.opener_has_handler(o, urllib2.HTTPHandler) o = build_opener(urllib2.HTTPHandler) self.opener_has_handler(o, urllib2.HTTPHandler) o = build_opener(urllib2.HTTPHandler()) self.opener_has_handler(o, urllib2.HTTPHandler) # Issue2670: multiple handlers sharing the same base class class MyOtherHTTPHandler(urllib2.HTTPHandler): pass o = build_opener(MyHTTPHandler, MyOtherHTTPHandler) self.opener_has_handler(o, MyHTTPHandler) self.opener_has_handler(o, MyOtherHTTPHandler) def opener_has_handler(self, opener, handler_class): for h in opener.handlers: if h.__class__ == handler_class: break else: self.assert_(False) def test_main(verbose=None): tests = (TrivialTests, OpenerDirectorTests, HandlerTests, MiscTests) test_support.run_unittest(*tests) if __name__ == "__main__": test_main(verbose=True) diff --git a/greentest/test_urllib2_localnet.py b/greentest/test_urllib2_localnet.py index a8e0d39..c298ce2 100755 --- a/greentest/test_urllib2_localnet.py +++ b/greentest/test_urllib2_localnet.py @@ -1,306 +1,306 @@ #!/usr/bin/python2 from greentest import exit_unless_25; exit_unless_25() import sys import urlparse import unittest import hashlib from greentest import test_support -from eventlet.green import threading -from eventlet.green import socket -from eventlet.green import urllib2 -from eventlet.green import BaseHTTPServer +from eventlib.green import threading +from eventlib.green import socket +from eventlib.green import urllib2 +from eventlib.green import BaseHTTPServer # Loopback http server infrastructure class LoopbackHttpServer(BaseHTTPServer.HTTPServer): """HTTP server w/ a few modifications that make it useful for loopback testing purposes. """ def __init__(self, server_address, RequestHandlerClass): BaseHTTPServer.HTTPServer.__init__(self, server_address, RequestHandlerClass) # Set the timeout of our listening socket really low so # that we can stop the server easily. self.socket.settimeout(1.0) def get_request(self): """BaseHTTPServer method, overridden.""" request, client_address = self.socket.accept() # It's a loopback connection, so setting the timeout # really low shouldn't affect anything, but should make # deadlocks less likely to occur. request.settimeout(10.0) return (request, client_address) class LoopbackHttpServerThread(threading.Thread): """Stoppable thread that runs a loopback http server.""" def __init__(self, port, RequestHandlerClass): threading.Thread.__init__(self) self._RequestHandlerClass = RequestHandlerClass self._stop = False self._port = port self._server_address = ('127.0.0.1', self._port) self.ready = threading.Event() def stop(self): """Stops the webserver if it's currently running.""" # Set the stop flag. self._stop = True self.join() def run(self): protocol = "HTTP/1.0" self._RequestHandlerClass.protocol_version = protocol httpd = LoopbackHttpServer(self._server_address, self._RequestHandlerClass) sa = httpd.socket.getsockname() #print "Serving HTTP on", sa[0], "port", sa[1], "..." self.ready.set() while not self._stop: httpd.handle_request() # Authentication infrastructure class DigestAuthHandler: """Handler for performing digest authentication.""" def __init__(self): self._request_num = 0 self._nonces = [] self._users = {} self._realm_name = "Test Realm" self._qop = "auth" def set_qop(self, qop): self._qop = qop def set_users(self, users): assert isinstance(users, dict) self._users = users def set_realm(self, realm): self._realm_name = realm def _generate_nonce(self): self._request_num += 1 nonce = hashlib.md5(str(self._request_num)).hexdigest() self._nonces.append(nonce) return nonce def _create_auth_dict(self, auth_str): first_space_index = auth_str.find(" ") auth_str = auth_str[first_space_index+1:] parts = auth_str.split(",") auth_dict = {} for part in parts: name, value = part.split("=") name = name.strip() if value[0] == '"' and value[-1] == '"': value = value[1:-1] else: value = value.strip() auth_dict[name] = value return auth_dict def _validate_auth(self, auth_dict, password, method, uri): final_dict = {} final_dict.update(auth_dict) final_dict["password"] = password final_dict["method"] = method final_dict["uri"] = uri HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict HA1 = hashlib.md5(HA1_str).hexdigest() HA2_str = "%(method)s:%(uri)s" % final_dict HA2 = hashlib.md5(HA2_str).hexdigest() final_dict["HA1"] = HA1 final_dict["HA2"] = HA2 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \ "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict response = hashlib.md5(response_str).hexdigest() return response == auth_dict["response"] def _return_auth_challenge(self, request_handler): request_handler.send_response(407, "Proxy Authentication Required") request_handler.send_header("Content-Type", "text/html") request_handler.send_header( 'Proxy-Authenticate', 'Digest realm="%s", ' 'qop="%s",' 'nonce="%s", ' % \ (self._realm_name, self._qop, self._generate_nonce())) # XXX: Not sure if we're supposed to add this next header or # not. #request_handler.send_header('Connection', 'close') request_handler.end_headers() request_handler.wfile.write("Proxy Authentication Required.") return False def handle_request(self, request_handler): """Performs digest authentication on the given HTTP request handler. Returns True if authentication was successful, False otherwise. If no users have been set, then digest auth is effectively disabled and this method will always return True. """ if len(self._users) == 0: return True if not request_handler.headers.has_key('Proxy-Authorization'): return self._return_auth_challenge(request_handler) else: auth_dict = self._create_auth_dict( request_handler.headers['Proxy-Authorization'] ) if self._users.has_key(auth_dict["username"]): password = self._users[ auth_dict["username"] ] else: return self._return_auth_challenge(request_handler) if not auth_dict.get("nonce") in self._nonces: return self._return_auth_challenge(request_handler) else: self._nonces.remove(auth_dict["nonce"]) auth_validated = False # MSIE uses short_path in its validation, but Python's # urllib2 uses the full path, so we're going to see if # either of them works here. for path in [request_handler.path, request_handler.short_path]: if self._validate_auth(auth_dict, password, request_handler.command, path): auth_validated = True if not auth_validated: return self._return_auth_challenge(request_handler) return True # Proxy test infrastructure class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): """This is a 'fake proxy' that makes it look like the entire internet has gone down due to a sudden zombie invasion. It main utility is in providing us with authentication support for testing. """ digest_auth_handler = DigestAuthHandler() def log_message(self, format, *args): # Uncomment the next line for debugging. #sys.stderr.write(format % args) pass def do_GET(self): (scm, netloc, path, params, query, fragment) = urlparse.urlparse( self.path, 'http') self.short_path = path if self.digest_auth_handler.handle_request(self): self.send_response(200, "OK") self.send_header("Content-Type", "text/html") self.end_headers() self.wfile.write("You've reached %s!
" % self.path) self.wfile.write("Our apologies, but our server is down due to " "a sudden zombie invasion.") # Test cases class ProxyAuthTests(unittest.TestCase): URL = "http://www.foo.com" PORT = 8080 USER = "tester" PASSWD = "test123" REALM = "TestRealm" PROXY_URL = "http://127.0.0.1:%d" % PORT def setUp(self): FakeProxyHandler.digest_auth_handler.set_users({ self.USER : self.PASSWD }) FakeProxyHandler.digest_auth_handler.set_realm(self.REALM) self.server = LoopbackHttpServerThread(self.PORT, FakeProxyHandler) self.server.start() self.server.ready.wait() handler = urllib2.ProxyHandler({"http" : self.PROXY_URL}) self._digest_auth_handler = urllib2.ProxyDigestAuthHandler() self.opener = urllib2.build_opener(handler, self._digest_auth_handler) def tearDown(self): self.server.stop() def test_proxy_with_bad_password_raises_httperror(self): self._digest_auth_handler.add_password(self.REALM, self.URL, self.USER, self.PASSWD+"bad") FakeProxyHandler.digest_auth_handler.set_qop("auth") self.assertRaises(urllib2.HTTPError, self.opener.open, self.URL) def test_proxy_with_no_password_raises_httperror(self): FakeProxyHandler.digest_auth_handler.set_qop("auth") self.assertRaises(urllib2.HTTPError, self.opener.open, self.URL) def test_proxy_qop_auth_works(self): self._digest_auth_handler.add_password(self.REALM, self.URL, self.USER, self.PASSWD) FakeProxyHandler.digest_auth_handler.set_qop("auth") result = self.opener.open(self.URL) while result.read(): pass result.close() def test_proxy_qop_auth_int_works_or_throws_urlerror(self): self._digest_auth_handler.add_password(self.REALM, self.URL, self.USER, self.PASSWD) FakeProxyHandler.digest_auth_handler.set_qop("auth-int") try: result = self.opener.open(self.URL) except urllib2.URLError: # It's okay if we don't support auth-int, but we certainly # shouldn't receive any kind of exception here other than # a URLError. result = None if result: while result.read(): pass result.close() def test_main(): # We will NOT depend on the network resource flag # (Lib/test/regrtest.py -u network) since all tests here are only # localhost. However, if this is a bad rationale, then uncomment # the next line. #test_support.requires("network") test_support.run_unittest(ProxyAuthTests) if __name__ == "__main__": test_main() diff --git a/greentest/timer_test.py b/greentest/timer_test.py index 657656a..e774bcc 100644 --- a/greentest/timer_test.py +++ b/greentest/timer_test.py @@ -1,68 +1,68 @@ # @author Donovan Preston # # Copyright (c) 2006-2007, Linden Research, Inc. # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. from unittest import TestCase, main -from eventlet import api, timer +from eventlib import api, timer class TestTimer(TestCase): mode = 'static' def test_copy(self): t = timer.Timer(0, lambda: None) t2 = t.copy() assert t.seconds == t2.seconds assert t.tpl == t2.tpl assert t.called == t2.called ## def test_cancel(self): ## r = runloop.RunLoop() ## called = [] ## t = timer.Timer(0, lambda: called.append(True)) ## t.cancel() ## r.add_timer(t) ## r.add_observer(lambda r, activity: r.abort(), 'after_waiting') ## r.run() ## assert not called ## assert not r.running def test_schedule(self): hub = api.get_hub() # clean up the runloop, preventing side effects from previous tests # on this thread if hub.running: hub.abort() api.sleep(0) called = [] #t = timer.Timer(0, lambda: (called.append(True), hub.abort())) #t.schedule() # let's have a timer somewhere in the future; make sure abort() still works # (for libevent, its dispatcher() does not exit if there is something scheduled) # XXX libevent handles this, other hubs do not #api.get_hub().schedule_call_global(10000, lambda: (called.append(True), hub.abort())) api.get_hub().schedule_call_global(0, lambda: (called.append(True), hub.abort())) hub.default_sleep = lambda: 0.0 hub.switch() assert called assert not hub.running if __name__ == '__main__': main() diff --git a/greentest/tpool_test.py b/greentest/tpool_test.py index 4ffada1..39ab4e5 100644 --- a/greentest/tpool_test.py +++ b/greentest/tpool_test.py @@ -1,216 +1,216 @@ # Copyright (c) 2007, Linden Research, Inc. # Copyright (c) 2007, IBM Corp. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from sys import stdout import time from unittest import TestCase, main import random import uuid -from eventlet import coros, api, tpool +from eventlib import coros, api, tpool r = random.WichmannHill() _g_debug = False def prnt(msg): if _g_debug: print msg class yadda(object): def __init__(self): pass def foo(self,when,n=None): assert(n is not None) prnt("foo: %s, %s" % (when,n)) time.sleep(r.random()/20.0) return n def sender_loop(pfx): n = 0 obj = tpool.Proxy(yadda()) while n < 10: if not (n % 5): stdout.write('.') stdout.flush() api.sleep(0) now = time.time() prnt("%s: send (%s,%s)" % (pfx,now,n)) rv = obj.foo(now,n=n) prnt("%s: recv %s" % (pfx, rv)) assert(n == rv) api.sleep(0) n += 1 one = 1 two = 2 three = 3 class TestTpool(TestCase): def setUp(self): # turn off exception printing, because we'll be deliberately # triggering exceptions in our tests tpool.QUIET = True def tearDown(self): tpool.QUIET = False def test_a_buncha_stuff(self): pool = coros.CoroutinePool(max_size=10) waiters = [] for i in range(0,9): waiters.append(pool.execute(sender_loop,i)) for waiter in waiters: waiter.wait() def test_wrap_tuple(self): my_tuple = (1, 2) prox = tpool.Proxy(my_tuple) self.assertEqual(prox[0], 1) self.assertEqual(prox[1], 2) self.assertEqual(len(my_tuple), 2) def test_wrap_string(self): my_object = "whatever" prox = tpool.Proxy(my_object) self.assertEqual(str(my_object), str(prox)) self.assertEqual(len(my_object), len(prox)) self.assertEqual(my_object.join(['a', 'b']), prox.join(['a', 'b'])) def test_wrap_uniterable(self): # here we're treating the exception as just a normal class prox = tpool.Proxy(FloatingPointError()) def index(): prox[0] def key(): prox['a'] self.assertRaises(IndexError, index) self.assertRaises(TypeError, key) def test_wrap_dict(self): my_object = {'a':1} prox = tpool.Proxy(my_object) self.assertEqual('a', prox.keys()[0]) self.assertEqual(1, prox['a']) self.assertEqual(str(my_object), str(prox)) self.assertEqual(repr(my_object), repr(prox)) self.assertEqual(`my_object`, `prox`) def test_wrap_module_class(self): prox = tpool.Proxy(uuid) self.assertEqual(tpool.Proxy, type(prox)) id = prox.uuid4() self.assertEqual(id.get_version(), uuid.uuid4().get_version()) self.assert_(repr(prox.uuid4)) def test_wrap_eq(self): prox = tpool.Proxy(uuid) id1 = prox.uuid4() id2 = prox.UUID(str(id1)) self.assertEqual(id1, id2) id3 = prox.uuid4() self.assert_(id1 != id3) def test_wrap_nonzero(self): prox = tpool.Proxy(uuid) id1 = prox.uuid4() self.assert_(bool(id1)) prox2 = tpool.Proxy([1, 2, 3]) self.assert_(bool(prox2)) def test_multiple_wraps(self): prox1 = tpool.Proxy(uuid) prox2 = tpool.Proxy(uuid) x1 = prox1.uuid4() x2 = prox1.uuid4() del x2 x3 = prox2.uuid4() def test_wrap_getitem(self): prox = tpool.Proxy([0,1,2]) self.assertEqual(prox[0], 0) def test_wrap_setitem(self): prox = tpool.Proxy([0,1,2]) prox[1] = 2 self.assertEqual(prox[1], 2) def test_raising_exceptions(self): prox = tpool.Proxy(uuid) def nofunc(): prox.never_name_a_function_like_this() self.assertRaises(AttributeError, nofunc) def assertLessThan(self, a, b): self.assert_(a < b, "%s is not less than %s" % (a, b)) def test_variable_and_keyword_arguments_with_function_calls(self): import optparse parser = tpool.Proxy(optparse.OptionParser()) z = parser.add_option('-n', action='store', type='string', dest='n') opts,args = parser.parse_args(["-nfoo"]) self.assertEqual(opts.n, 'foo') def test_contention(self): from greentest import tpool_test prox = tpool.Proxy(tpool_test) pool = coros.CoroutinePool(max_size=4) waiters = [] waiters.append(pool.execute(lambda: self.assertEquals(prox.one, 1))) waiters.append(pool.execute(lambda: self.assertEquals(prox.two, 2))) waiters.append(pool.execute(lambda: self.assertEquals(prox.three, 3))) for waiter in waiters: waiter.wait() def test_timeout(self): import time api.exc_after(0.1, api.TimeoutError()) self.assertRaises(api.TimeoutError, tpool.execute, time.sleep, 0.3) def dont_test_benchmark(self): """ Benchmark computing the amount of overhead tpool adds to function calls. Rename to activate.""" iterations = 10000 def bench(f, *args, **kw): for i in xrange(iterations): f(*args, **kw) def noop(): pass normal_results = [] tpool_results = [] for i in xrange(3): start = time.time() bench(noop) end = time.time() normal_results.append(end-start) start = time.time() bench(tpool.execute, noop) end = time.time() tpool_results.append(end-start) avg_normal = sum(normal_results)/len(normal_results) avg_tpool = sum(tpool_results)/len(tpool_results) tpool_overhead = (avg_tpool-avg_normal)/iterations print "%s iterations\nTpool overhead is %s seconds per call. Normal: %s; Tpool: %s" % ( iterations, tpool_overhead, normal_results, tpool_results) if __name__ == '__main__': main() diff --git a/greentest/with_eventlet.py b/greentest/with_eventlet.py index ecbed7a..cb904e1 100755 --- a/greentest/with_eventlet.py +++ b/greentest/with_eventlet.py @@ -1,75 +1,75 @@ #!/usr/bin/python2 # Copyright (c) 2008-2009 AG Projects # Author: Denis Bilenko # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. """Execute python script with hub installed. Usage: %prog [--hub HUB] [--reactor REACTOR] program.py """ import sys def import_reactor(reactor): m = __import__('twisted.internet.' + reactor) return getattr(m.internet, reactor) def setup_hub(hub, reactor): if reactor is not None: import_reactor(reactor).install() if hub is not None: - from eventlet.api import use_hub + from eventlib.api import use_hub try: use_hub(hub) except ImportError, ex: # as a shortcut, try to import the reactor with such name try: r = import_reactor(hub) except ImportError: sys.exit('No hub %s: %s' % (hub, ex)) else: r.install() use_hub('twistedr') def parse_args(): hub = None reactor = None - del sys.argv[0] # kill with_eventlet.py + del sys.argv[0] # kill with_eventlib.py if sys.argv[0]=='--hub': del sys.argv[0] hub = sys.argv[0] del sys.argv[0] if sys.argv[0]=='--reactor': del sys.argv[0] reactor = sys.argv[0] del sys.argv[0] return hub, reactor if __name__=='__main__': hub, reactor = parse_args() setup_hub(hub, reactor) - from eventlet.api import get_hub + from eventlib.api import get_hub hub = get_hub() # set up the hub now print '===HUB=%r' % hub if 'twisted.internet.reactor' in sys.modules: print '===REACTOR=%r' % sys.modules['twisted.internet.reactor'] sys.stdout.flush() execfile(sys.argv[0]) diff --git a/greentest/wsgi_test.py b/greentest/wsgi_test.py index 7845a3c..a9565b3 100644 --- a/greentest/wsgi_test.py +++ b/greentest/wsgi_test.py @@ -1,296 +1,296 @@ # @author Donovan Preston # # Copyright (c) 2007, Linden Research, Inc. # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import cgi import os from unittest import TestCase, main -from eventlet import api -from eventlet import wsgi -from eventlet import processes +from eventlib import api +from eventlib import wsgi +from eventlib import processes from greentest import find_command try: from cStringIO import StringIO except ImportError: from StringIO import StringIO def hello_world(env, start_response): if env['PATH_INFO'] == 'notexist': start_response('404 Not Found', [('Content-type', 'text/plain')]) return ["not found"] start_response('200 OK', [('Content-type', 'text/plain')]) return ["hello world"] def chunked_app(env, start_response): start_response('200 OK', [('Content-type', 'text/plain')]) yield "this" yield "is" yield "chunked" def big_chunks(env, start_response): start_response('200 OK', [('Content-type', 'text/plain')]) line = 'a' * 8192 for x in range(10): yield line class Site(object): def __init__(self): self.application = hello_world def __call__(self, env, start_response): return self.application(env, start_response) CONTENT_LENGTH = 'content-length' """ HTTP/1.1 200 OK Date: foo Content-length: 11 hello world """ class ConnectionClosed(Exception): pass def read_http(sock): fd = sock.makeGreenFile() response_line = fd.readline() if not response_line: raise ConnectionClosed raw_headers = fd.readuntil('\r\n\r\n').strip() #print "R", response_line, raw_headers headers = dict() for x in raw_headers.split('\r\n'): #print "X", x key, value = x.split(': ', 1) headers[key.lower()] = value if CONTENT_LENGTH in headers: num = int(headers[CONTENT_LENGTH]) body = fd.read(num) #print body else: body = None return response_line, headers, body class TestHttpd(TestCase): mode = 'static' def setUp(self): self.logfile = StringIO() self.site = Site() self.killer = api.spawn( wsgi.server, api.tcp_listener(('0.0.0.0', 12346)), self.site, max_size=128, log=self.logfile) def tearDown(self): api.kill(self.killer) def test_001_server(self): sock = api.connect_tcp( ('127.0.0.1', 12346)) fd = sock.makeGreenFile() fd.write('GET / HTTP/1.0\r\nHost: localhost\r\n\r\n') result = fd.read() fd.close() ## The server responds with the maximum version it supports self.assert_(result.startswith('HTTP'), result) self.assert_(result.endswith('hello world')) def test_002_keepalive(self): sock = api.connect_tcp( ('127.0.0.1', 12346)) fd = sock.makeGreenFile() fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') read_http(sock) fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') read_http(sock) fd.close() def test_003_passing_non_int_to_read(self): # This should go in greenio_test sock = api.connect_tcp( ('127.0.0.1', 12346)) fd = sock.makeGreenFile() fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') cancel = api.exc_after(1, RuntimeError) self.assertRaises(TypeError, fd.read, "This shouldn't work") cancel.cancel() fd.close() def test_004_close_keepalive(self): sock = api.connect_tcp( ('127.0.0.1', 12346)) fd = sock.makeGreenFile() fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') read_http(sock) fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n') read_http(sock) fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') self.assertRaises(ConnectionClosed, read_http, sock) fd.close() def skip_test_005_run_apachebench(self): url = 'http://localhost:12346/' # ab is apachebench out = processes.Process(find_command('ab'), ['-c','64','-n','1024', '-k', url]) print out.read() def test_006_reject_long_urls(self): sock = api.connect_tcp( ('127.0.0.1', 12346)) path_parts = [] for ii in range(3000): path_parts.append('path') path = '/'.join(path_parts) request = 'GET /%s HTTP/1.0\r\nHost: localhost\r\n\r\n' % path fd = sock.makeGreenFile() fd.write(request) result = fd.readline() status = result.split(' ')[1] self.assertEqual(status, '414') fd.close() def test_007_get_arg(self): # define a new handler that does a get_arg as well as a read_body def new_app(env, start_response): body = env['wsgi.input'].read() a = cgi.parse_qs(body).get('a', [1])[0] start_response('200 OK', [('Content-type', 'text/plain')]) return ['a is %s, body is %s' % (a, body)] self.site.application = new_app sock = api.connect_tcp( ('127.0.0.1', 12346)) request = '\r\n'.join(( 'POST / HTTP/1.0', 'Host: localhost', 'Content-Length: 3', '', 'a=a')) fd = sock.makeGreenFile() fd.write(request) # send some junk after the actual request fd.write('01234567890123456789') reqline, headers, body = read_http(sock) self.assertEqual(body, 'a is a, body is a=a') fd.close() def test_008_correctresponse(self): sock = api.connect_tcp( ('127.0.0.1', 12346)) fd = sock.makeGreenFile() fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') response_line_200,_,_ = read_http(sock) fd.write('GET /notexist HTTP/1.1\r\nHost: localhost\r\n\r\n') response_line_404,_,_ = read_http(sock) fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') response_line_test,_,_ = read_http(sock) self.assertEqual(response_line_200,response_line_test) fd.close() def test_009_chunked_response(self): self.site.application = chunked_app sock = api.connect_tcp( ('127.0.0.1', 12346)) fd = sock.makeGreenFile() fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n') self.assert_('Transfer-Encoding: chunked' in fd.read()) def test_010_no_chunked_http_1_0(self): self.site.application = chunked_app sock = api.connect_tcp( ('127.0.0.1', 12346)) fd = sock.makeGreenFile() fd.write('GET / HTTP/1.0\r\nHost: localhost\r\nConnection: close\r\n\r\n') self.assert_('Transfer-Encoding: chunked' not in fd.read()) def test_011_multiple_chunks(self): self.site.application = big_chunks sock = api.connect_tcp( ('127.0.0.1', 12346)) fd = sock.makeGreenFile() fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n') headers = fd.readuntil('\r\n\r\n') self.assert_('Transfer-Encoding: chunked' in headers) chunks = 0 chunklen = int(fd.readline(), 16) while chunklen: chunks += 1 chunk = fd.read(chunklen) fd.readline() chunklen = int(fd.readline(), 16) self.assert_(chunks > 1) def test_012_ssl_server(self): - from eventlet import httpc + from eventlib import httpc def wsgi_app(environ, start_response): start_response('200 OK', {}) return [environ['wsgi.input'].read()] certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt') private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key') sock = api.ssl_listener(('', 4201), certificate_file, private_key_file) api.spawn(wsgi.server, sock, wsgi_app) result = httpc.post("https://localhost:4201/foo", "abc") self.assertEquals(result, 'abc') def test_013_empty_return(self): - from eventlet import httpc + from eventlib import httpc def wsgi_app(environ, start_response): start_response("200 OK", []) return [""] certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt') private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key') sock = api.ssl_listener(('', 4202), certificate_file, private_key_file) api.spawn(wsgi.server, sock, wsgi_app) res = httpc.get("https://localhost:4202/foo") self.assertEquals(res, '') if __name__ == '__main__': main()