国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

Python使用 Beanstalkd 做異步任務處理的方法

2020-02-22 23:51:39
字體:
來源:轉載
供稿:網友

使用 Beanstalkd 作為消息隊列服務,然后結合 Python 的裝飾器語法實現一個簡單的異步任務處理工具.

最終效果

定義任務:

from xxxxx.job_queue import JobQueuequeue = JobQueue()@queue.task('task_tube_one')def task_one(arg1, arg2, arg3): # do task

提交任務:

task_one.put(arg1="a", arg2="b", arg3="c")

然后就可以由后臺的 work 線程去執行這些任務了。

實現過程

1、了解 Beanstalk Server

Beanstalk is a simple, fast work queue. https://github.com/kr/beanstalkd

Beanstalk 是一個 C 語言實現的消息隊列服務。 它提供了通用的接口,最初設計的目的是通過異步運行耗時的任務來減少大量Web應用程序中的頁面延遲。針對不同的語言,有不同的 Beanstalkd Client 實現。 Python 里就有 beanstalkc 等。我就是利用 beanstalkc 來作為與 beanstalkd server 通信的工具。

2、任務異步執行實現原理

beanstalkd 只能進行字符串的任務調度。為了讓程序支持提交函數和參數,然后由woker執行函數并攜帶參數。需要一個中間層來將函數與傳遞的參數注冊。

實現主要包括3個部分:

Subscriber: 負責將函數注冊到 beanstalk 的一個tube上,實現很簡單,注冊函數名和函數本身的對應關系。(也就意味著同一個分組(tube)下不能有相同函數名存在)。數據存儲在類變量里。

class Subscriber(object): FUN_MAP = defaultdict(dict) def __init__(self, func, tube):  logger.info('register func:{} to tube:{}.'.format(func.__name__, tube))  Subscriber.FUN_MAP[tube][func.__name__] = func

JobQueue: 方便將一個普通函數轉換為具有 Putter 能力的裝飾器

class JobQueue(object): @classmethod def task(cls, tube):  def wrapper(func):   Subscriber(func, tube)   return Putter(func, tube)  return wrapper

Putter: 將函數名、函數參數、指定的分組組合為一個對象,然后 json 序列化為字符串,最后通過 beanstalkc 推送到beanstalkd 隊列。

class Putter(object): def __init__(self, func, tube):  self.func = func  self.tube = tube # 直接調用返回 def __call__(self, *args, **kwargs):  return self.func(*args, **kwargs) # 推給離線隊列 def put(self, **kwargs):  args = {   'func_name': self.func.__name__,   'tube': self.tube,   'kwargs': kwargs  }  logger.info('put job:{} to queue'.format(args))  beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])  try:   beanstalk.use(self.tube)   job_id = beanstalk.put(json.dumps(args))   return job_id  finally:   beanstalk.close()

Worker: 從 beanstalkd 隊列中取出字符串,然后通過 json.loads 反序列化為對象,獲得 函數名、參數和tube。最后從 Subscriber 中獲得 函數名對應的函數代碼,然后傳遞參數執行函數。

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 甘孜| 社会| 蒙阴县| 仲巴县| 银川市| 孟州市| 大港区| 平泉县| 台北市| 儋州市| 偏关县| 工布江达县| 辽中县| 西青区| 连云港市| 云浮市| 涿州市| 岑溪市| 广昌县| 扬中市| 崇明县| 乐陵市| 隆尧县| 岳西县| 剑河县| 雷波县| 馆陶县| 交城县| 井研县| 溧阳市| 林芝县| 昭觉县| 苍南县| 明溪县| 类乌齐县| 罗源县| 扎囊县| 清新县| 东兰县| 山阴县| 土默特左旗|