本篇文章给大家分享的是有关Python中怎么利用backoff实现轮询,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
创新互联建站是一家专业提供临洮企业网站建设,专注与成都网站设计、成都做网站、HTML5、小程序制作等业务。10年已为临洮众多企业、政府机构等服务。创新互联专业网站制作公司优惠进行中。
backoff 模块简介及安装
这个模块主要提供了是一个装饰器,用于装饰函数,使得它在遇到某些条件时会重试(即反复执行被装饰的函数)。通常适用于我们在获取一些不可靠资源,比如会间歇性故障的资源等。
此外,装饰器支持正常的同步方法,也支持异步asyncio代码。
backoff 模块的安装也很简单,通过 pip 即可安装完成:
pip install backoff
backoff 用法及简单源码分析
backoff 提供两个主要的装饰器,通过 backoff. 调用,通过提示我们可以看到这两个装饰器,分别是:
backoff.on_predicatebackoff.on_exception
通过 github 查看 backoff 的源码,源码目录 backoff/_decorator.py,定义如下:
def on_predicate(wait_gen, predicate=operator.not_, max_tries=None, max_time=None, jitter=full_jitter, on_success=None, on_backoff=None, on_giveup=None, logger='backoff', **wait_gen_kwargs): # 省略具体代码 # 每个参数的定义在源码中都给出了明确的解释 pass def on_exception(wait_gen, exception, max_tries=None, max_time=None, jitter=full_jitter, giveup=lambda e: False, on_success=None, on_backoff=None, on_giveup=None, logger='backoff', **wait_gen_kwargs): # 省略具体代码 # 每个参数的定义在源码中都给出了明确的解释 pass
可以看到,定义了很多的参数,这些参数在源码中都给出了比较详细的解释,这里做简单的介绍:
首先,wait_gen:表示每次循环等待的时长,以秒为单位。它的类型是一个生成器,在 backoff 中内置了三个生成器。我们查看下源码,目录为 backoff/_wait_gen.py。我们取其中一个的详细实现来看下:
# 省略实现代码# base * factor * ndef expo(base=2, factor=1, max_value=None): """Generator for exponential decay. Args: base: The mathematical base of the exponentiation operation factor: Factor to multiply the exponentation by. max_value: The maximum value to yield. Once the value in the true exponential sequence exceeds this, the value of max_value will forever after be yielded. """ n = 0 while True: a = factor * base ** n if max_value is None or a < max_value: yield a n += 1 else: yield max_value# 通过斐波那契数列控制def fibo(max_value=None): pass# 常量数值def constant(interval=1): pass
从源码不难看出,通过一些策略,每次 yield 返回不同的数值,这些数值就是重试等待秒数。当然因为这个参数类型是生成器,显然我们也是可以自定义的。同时我们会发现每个 wait_gen 都是参数控制的,所以我们理应是可以修改这个参数的初始值的。
显然,wait_gen_kwargs就是用来传递这些参数的,它是通过可变关键字参数控制的,可以直接用 key=value 的形式进行传参,简单示例如下:
@backoff.on_predicate(backoff.constant, interval=5)def main3(): print("time is {} retry...".format(time.time()))
predict 与 exception。这两个相对比较简单,predict 接受一个函数,当这个函数返回 True 时会进行重试,否则停止,同时这个函数接受一个参数,这个参数的值是被装饰函数的返回值。这个参数的默认值是:operator._not。这个函数的源码如下:
def not_(a): "Same as not a." return not a
所以默认返回的是 not 被装饰函数的返回值。如果当被装饰函数并没有返回值时,返回 True,会进行重试。
示例代码如下:
import backoffimport time@backoff.on_predicate(backoff.fibo)def test2(): print("time is {}, retry...".format(time.time()))if __name__ == "__main__": test2()# 等价于:# 必须接受一个参数,这个参数的值是被装饰函数的返回值def condition(r): return True @backoff.on_predicate(backoff.fibo, condition)def test2(): print("time is {}, retry...".format(time.time()))if __name__ == "__main__": test2()
执行结果如下:
$ python3 backoff_test.pytime is 1571801845.834578, retry...time is 1571801846.121314, retry...time is 1571801846.229812, retry...time is 1571801846.533237, retry...time is 1571801849.460303, retry...time is 1571801850.8974788, retry...time is 1571801856.498335, retry...time is 1571801861.56931, retry...time is 1571801872.701226, retry...time is 1571801879.198495, retry... ...
需要注意几点:
如果自定义这个参数对应的函数,这个函数是需要接受一个参数的,这个参数的值是被装饰函数的返回值。我们可以通过控制这个返回值来做一些条件判断,当达到某些特殊条件时重试结束。
示例中 wait_gen 用的是 backoff.fibo,注意观察输出的时间单隔,这里的时间间隔好像并不像我们想象中按 fibo 返回的时间间隔数,实际上如果想达到这个效果,我们需要将 jitter 参数设置为 None,后面介绍 jitter 参数时再做说明。
而 exception 则是接受异常类型的实例,可以是单个异常,也可以是元组形式的多个异常。简单示例如下:
import timeimport randomimport backofffrom collections import dequeclass MyException(Exception): def __init__(self, message, status): super().__init__(message, status) self.message = message self.status = statusclass MyException2(Exception): pass@backoff.on_exception(backoff.expo, (MyException, MyException2))def main(): random_num = random.randint(0, 9) print("retry...and random num is {}".format(random_num)) if random_num % 2 == 0: raise MyException("my exception", int("1000" + str(random_num))) raise MyException2()
max_tries 与 max_time 也比较简单,分别代表最大重试次数与最长重试时间。这里就不做演示了。
@backoff.on_exception 中的 giveup,它接受一个异常实例,通过对这个实例做一些条件判断,达到判断是否需要继续循环的目的。如果返回 True,则结束,反之继续。默认值一直是返回 False,即会一直循环。示例如下:
import randomimport backoffclass MyException(Exception): def __init__(self, message, status): super().__init__(message, status) self.message = message self.status = statusdef exception_status(e): print('exception status code is {}'.format(e.status)) return e.status % 2 == 0 @backoff.on_exception(backoff.expo, MyException, giveup=exception_status)def main(): random_num = random.randint(0, 9) print("retry...and random num is {}".format(random_num)) raise MyException("my exception", int("1000" + str(random_num)))if __name__ == "__main__": main()
运行结果:
retry...and random num is 5exception status code is 10005retry...and random num is 0exception status code is 10000# 会再走一遍 raise 的代码,所以异常仍然会抛出来 Traceback (most recent call last): File "backoff_test.py", line 36, inmain() File "/Users/ruoru/code/python/exercise/.venv/lib/python3.7/site-packages/backoff/_sync.py", line 94, in retry ret = target(*args, **kwargs) File "backoff_test.py", line 32, in main raise MyException("my exception", int("1000" + str(random_num))) __main__.MyException: ('my exception', 10000)
需要注意两点:
这个参数接受的函数仍然只有一个参数,这个参数的值是一个异常实例对象
从结果我们可以看出,当抛出异常时,会先进入 giveup 接受的函数,如果函数判断需要 giveup 时,当前的异常仍然会抛出。所以有需要,代码仍然需要做异常逻辑处理。
on_success、on_backoff 与 on_giveup 这三个是一类的参数,用于做事件处理:
on_sucess 事件会比较难理解一点,它表示的是被装饰函数成功结束轮循则会退出,对于 on_exception 来说即当被装饰函数没有发生异常时则会调用 on_success。而对于 on_predicate 来说即是通过 predicate 关键字返回为 False 结束循环则会调用。
on_backoff 即当程序产生循环时会调用
on_giveup 当程序是达到当前可尝试最大次数后,会调用。对于 on_predicate 如果是通过 max_tries 或者 max_time 会调用,而对于 on_exception ,对于 exception 参数返回 True 时也会调用 on_giveup
总结来说,max_tries 和 max_time 这种直接控制结束的,调用的是 on_giveup,而 exception 参数也是通过返回 True 则程序就结束,它是用来控制程序结束的,所以也会调用 on_giveup。而 predicate 参数返回 True 则程序继续,它是用来控制程序是否继续徨的,所以当它结束时,调用的是 on_success。
实验代码如下:
''' @Author: ruoru @Date: 2019-10-22 15:30:32 @LastEditors: ruoru @LastEditTime: 2019-10-23 14:37:13 @Description: backoff '''import timeimport randomimport backoffclass MyException(Exception): def __init__(self, status, message): super().__init__(status, message) self.status = status self.message = messagedef backoff_hdlr(details): print("Backing off {wait:0.1f} seconds afters {tries} tries " "calling function {target} with args {args} and kwargs " "{kwargs}".format(**details))def success_hdlr(details): print("Success offafters {tries} tries " "calling function {target} with args {args} and kwargs " "{kwargs}".format(**details))def giveup_hdlr(details): print("Giveup off {tries} tries " "calling function {target} with args {args} and kwargs " "{kwargs}".format(**details))@backoff.on_predicate( backoff.constant, # 当 random num 不等 10009 则继续 # 当 random_num 等于 10009 后,会调用 on_success lambda x: x != 10009, on_success=success_hdlr, on_backoff=backoff_hdlr, on_giveup=giveup_hdlr, max_time=2)def main(): num = random.randint(10000, 10010) print("time is {}, num is {}, retry...".format(time.time(), num)) return num@backoff.on_exception( backoff.constant, MyException, # 当 Exception 实例对象的 status 为 10009 成立时退出 # 当条件成立时,调用的是 on_giveup giveup=lambda e: e.status == 10009, on_success=success_hdlr, on_backoff=backoff_hdlr, on_giveup=giveup_hdlr, )def main2(): num = random.randint(10000, 10010) print("time is {}, num is {}, retry...".format(time.time(), num)) # 如果是通过这个条件成立退出,调用的是 on_success if num == 10010: return raise MyException(num, "hhh")if __name__ == "__main__": #main() main2()
logger 参数,很显然就是用来控制日志输出的,这里不做详细介绍。copy 官方文档的一个示例:
my_logger = logging.getLogger('my_logger') my_handler = logging.StreamHandler() my_logger.add_handler(my_handler) my_logger.setLevel(logging.ERROR) @backoff.on_exception(backoff.expo, requests.exception.RequestException, logger=my_logger)# ...
最后一个参数,jitter,开始也不是很明白这个参数的作用,文档的解释如下:
jitter: A function of the value yielded by wait_gen returning the actual time to wait. This distributes wait times stochastically in order to avoid timing collisions across concurrent clients. Wait times are jittered by default using the full_jitter function. Jittering may be disabled altogether by passing jitter=None.
有点晕,于是去看了下源码,明白了用法,截取关键源码如下:
# backoff/_decorator.pydef on_predicate(wait_gen, predicate=operator.not_, max_tries=None, max_time=None, jitter=full_jitter, on_success=None, on_backoff=None, on_giveup=None, logger='backoff', **wait_gen_kwargs): pass # 省略 # 因为没有用到异步,所以会进到这里 if retry is None: retry = _sync.retry_predicate# backoff/_sync# 分析可以看到有一句获取下次 wait 时长seconds = _next_wait(wait, jitter, elapsed, max_time_)# backoff/_commondef _next_wait(wait, jitter, elapsed, max_time): value = next(wait) try: if jitter is not None: seconds = jitter(value) else: seconds = value except TypeError: warnings.warn( "Nullary jitter function signature is deprecated. Use " "unary signature accepting a wait value in seconds and " "returning a jittered version of it.", DeprecationWarning, stacklevel=2, ) seconds = value + jitter() # don't sleep longer than remaining alloted max_time if max_time is not None: seconds = min(seconds, max_time - elapsed) return seconds
看前面几行代码应该就会比较清晰了,如果 jitter 为 None,则会使用第一个参数返回的 value 值,而如果使用了,则会在这个 value 值上再做一次算法,默认为 full_jitter(value)。backoff/_jitter.py 提供了两个算法,代码不长,贴上来看看:
import randomdef random_jitter(value): """Jitter the value a random number of milliseconds. This adds up to 1 second of additional time to the original value. Prior to backoff version 1.2 this was the default jitter behavior. Args: value: The unadulterated backoff value. """ return value + random.random()def full_jitter(value): """Jitter the value across the full range (0 to value). This corresponds to the "Full Jitter" algorithm specified in the AWS blog's post on the performance of various jitter algorithms. (http://www.awsarchitectureblog.com/2015/03/backoff.html) Args: value: The unadulterated backoff value. """ return random.uniform(0, value)
以上就是Python中怎么利用backoff实现轮询,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注创新互联行业资讯频道。