diff --git a/xcap/xcapdiff.py b/xcap/xcapdiff.py
index 8995288..6fc817e 100644
--- a/xcap/xcapdiff.py
+++ b/xcap/xcapdiff.py
@@ -1,194 +1,158 @@
"""Track changes of the documents and notify subscribers
Create a Notifier object:
>>> n = Notifier(xcap_root, publish_xcapdiff_func)
When a change occurs, call on_change
>>> n.on_change(xcap_uri_updated, old_etag, new_etag)
(old_etag being None means the document was just created, new_etag being
None means the document was deleted)
Notifier will call publish_xcapdiff_func with 2 args: user's uri and xcap-diff document.
Number of calls is limited to no more than 1 call per MIN_WAIT seconds for
a given user uri.
"""
-
-from time import time
+import asyncio
from functools import wraps
-from twisted.internet import reactor
+from time import time
+from typing import Any, Dict, List, Optional, Union
+
+from xcap.configuration.datatypes import XCAPRootURI
+from xcap.types import PublishFunction, PublishWrapper
+from xcap.uri import XCAPUri
-def xml_xcapdiff(xcap_root, content):
+
+def xml_xcapdiff(xcap_root: XCAPRootURI, content: str) -> str:
return """
%s
""" % (xcap_root, content)
-def xml_document(sel, old_etag, new_etag):
+
+def xml_document(sel: XCAPUri, old_etag: str, new_etag: Union[str, None]) -> str:
if old_etag:
- old_etag = ( ' previous-etag="%s"' % old_etag )
+ old_etag = (' previous-etag="%s"' % old_etag)
else:
old_etag = ''
if new_etag:
- new_etag = ( ' new-etag="%s"' % new_etag )
+ new_etag = (' new-etag="%s"' % new_etag)
else:
new_etag = ''
return '' % (new_etag, sel, old_etag)
class UserChanges(object):
+ MIN_WAIT = 30
- MIN_WAIT = 5
-
- def __init__(self, publish_xcapdiff):
- self.changes = {}
+ def __init__(self, publish_xcapdiff: PublishFunction):
+ self.changes: Dict[XCAPUri, List[Any]] = {}
self.rate_limit = RateLimit(self.MIN_WAIT)
self.publish_xcapdiff = publish_xcapdiff
- def add_change(self, uri, old_etag, etag, xcap_root):
+ async def add_change(self, uri: XCAPUri, old_etag: str, etag: Union[str, None], xcap_root: XCAPRootURI) -> None:
self.changes.setdefault(uri, [old_etag, etag])[1] = etag
- self.rate_limit.callAtLimitedRate(self.publish, uri.user.uri, xcap_root)
+ await self.rate_limit.callAtLimitedRate(self.publish, uri.user.uri, xcap_root)
- def publish(self, user_uri, xcap_root):
+ async def publish(self, user_uri: str, xcap_root: XCAPRootURI) -> None:
if self.changes:
self.publish_xcapdiff(user_uri, self.unload_changes(xcap_root))
-
- def unload_changes(self, xcap_root):
+
+ def unload_changes(self, xcap_root: XCAPRootURI) -> str:
docs = []
for uri, (old_etag, etag) in self.changes.items():
docs.append(xml_document(uri, old_etag, etag))
result = xml_xcapdiff(xcap_root, '\n'.join(docs))
self.changes.clear()
return result
- def __bool__(self):
- return self.changes.__nonzero__()
+ def __bool__(self) -> bool:
+ return bool(self.changes)
class Notifier(object):
-
- def __init__(self, xcap_root, publish_xcapdiff_func):
+ def __init__(self, xcap_root: XCAPRootURI, publish_xcapdiff_func: PublishFunction) -> None:
self.publish_xcapdiff = publish_xcapdiff_func
self.xcap_root = xcap_root
# maps user_uri to UserChanges
- self.users_changes = {}
-
- def on_change(self, uri, old_etag, new_etag):
- changes = self.users_changes.setdefault(uri.user, UserChanges(self.publish_xcapdiff))
- changes.add_change(uri, old_etag, new_etag, self.xcap_root)
+ self.users_changes: Dict[str, UserChanges] = {}
+ async def on_change(self, uri: XCAPUri, old_etag: str, new_etag: Optional[str]) -> None:
+ changes = self.users_changes.setdefault(str(uri.user), UserChanges(self.publish_xcapdiff))
+ await changes.add_change(uri, old_etag, new_etag, self.xcap_root)
-class RateLimit(object):
- def __init__(self, min_wait):
- # minimum number of seconds between calls
+class RateLimit:
+ def __init__(self, min_wait: int):
self.min_wait = min_wait
+ self.last_call = 0.0
+ self.delayed_call: Optional[asyncio.Task] = None
- # time() of the last call
- self.last_call = 0
-
- # DelayedCall object of scheduled call
- self.delayed_call = None
-
- def callAtLimitedRate(self, f, *args, **kwargs):
- """Call f(*args, **kw) if it wasn't called in the last self.min_wait seconds.
- If it was, schedule it for later. Don't do anything if it's already scheduled.
-
- >>> rate = RateLimit(1)
-
- >>> def f(a, start = time()):
- ... print "%d %s" % (time()-start, a)
- ... return 'return value is lost!'
-
- >>> rate.callAtLimitedRate(f, 'a')
- 0 a
- >>> rate.callAtLimitedRate(f, 'b') # scheduled for 1 second later
- >>> rate.callAtLimitedRate(f, 'c') # ignored as there's already call in progress
- >>> _ = reactor.callLater(1.5, rate.callAtLimitedRate, f, 'd')
- >>> _ = reactor.callLater(2.1, reactor_stop)
- >>> reactor_run()
- 1 b
- 2 d
- """
+ async def callAtLimitedRate(self, f: PublishWrapper, *args, **kwargs) -> None:
current = time()
delta = current - self.last_call
- if not self.delayed_call or \
- self.delayed_call.called or \
- self.delayed_call.cancelled:
- @wraps(f)
- def wrapped_f():
- try:
- return f(*args, **kwargs)
- finally:
- self.last_call = time()
- self.delayed_call = callMaybeLater(self.min_wait - delta, wrapped_f)
+ if self.delayed_call is None or self.delayed_call.done():
+ await self._schedule(f, args, kwargs, delta)
+
+ async def _schedule(self, f: PublishWrapper, args, kwargs, delta: float) -> None:
+ if delta >= self.min_wait:
+ self.last_call = time()
+ await f(*args, **kwargs)
+ else:
+ self.delayed_call = asyncio.create_task(self._delayed_call(f, args, kwargs, delta))
+ async def _delayed_call(self, f: PublishWrapper, args, kwargs, delta: float) -> None:
+ await asyncio.sleep(self.min_wait - delta) # Wait for the remaining time
+ self.last_call = time() # Update the last call time
+ await f(*args, **kwargs) # Call the function
+ self.delayed_call = None # Clear the delayed call once it's executed
-class RateLimitedFun(RateLimit):
- def __init__(self, min_wait, function):
+class RateLimitedFun(RateLimit):
+ def __init__(self, min_wait: int, function):
RateLimit.__init__(self, min_wait)
self.function = function
def __call__(self, *args, **kwargs):
return self.callAtLimitedRate(self.function, *args, **kwargs)
-def limit_rate(min_wait):
+def limit_rate(min_wait: int):
"""Decorator for limiting rate of the function.
The resulting value of the new function will be None regardless of
what the wrapped function returned.
>>> @limit_rate(1)
... def f(a, start = time()):
... print "%d %s" % (time()-start, a)
... return 'return value is lost!'
>>> f('a')
0 a
>>> f('b') # scheduled for 1 second later
>>> f('c') # ignored as there's already call in progress
>>> _ = reactor.callLater(1.5, f, 'd')
>>> _ = reactor.callLater(2.1, reactor_stop)
>>> reactor_run()
1 b
2 d
"""
rate = RateLimit(min_wait)
-
- def decorate(f):
+
+ def decorate(f: PublishFunction):
@wraps(f)
- def wrapped(*args, **kwargs):
- rate.callAtLimitedRate(f, *args, **kwargs)
+ async def wrapped(*args, **kwargs):
+ await rate.callAtLimitedRate(f, *args, **kwargs)
return wrapped
return decorate
-def callMaybeLater(seconds, f, *args, **kw):
- "execute f and return None if seconds is zero, callLater otherwise"
- if seconds <= 0:
- f(*args, **kw)
- else:
- return reactor.callLater(seconds, f, *args, **kw)
-
-if __name__=='__main__':
- def reactor_run(first_time = [True]):
- if first_time[0]:
- reactor.run()
- first_time[0] = False
- else:
- reactor.running = True
- reactor.mainLoop()
-
- def reactor_stop():
- reactor.running = False
-
+if __name__ == '__main__':
import doctest
doctest.testmod()
-