国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

解決python3 pika之連接斷開的問題

2020-02-16 00:12:03
字體:
供稿:網(wǎng)友

問題描述

在消費(fèi)rabbitMQ隊(duì)列時(shí), 每次進(jìn)入回調(diào)函數(shù)內(nèi)需要進(jìn)行一些比較耗時(shí)的操作;操作完成后給rabbitMQ server發(fā)送ack信號(hào)以dequeue本條消息。

問題就發(fā)生在發(fā)送ack操作時(shí), 程序提示鏈接已被斷開或socket error。

源碼示例

#!/usr/bin#coding: utf-8import pikaimport timeUSER = 'guest'PWD = 'guest'TEST_QUEUE = 'just4test'def callback(ch, method, properties, body): print(body) time.sleep(600) ch.basic_publish('', routing_key=TEST_QUEUE, body="fortest") ch.basic_ack(delivery_tag = method.delivery_tag)def test_main(): s_conn = pika.BlockingConnection(  pika.ConnectionParameters('127.0.0.1',    credentials=pika.PlainCredentials(USER, PWD))) chan = s_conn.channel() chan.queue_declare(queue=TEST_QUEUE) chan.basic_publish('', routing_key=TEST_QUEUE, body="fortest") chan.basic_consume(callback, queue=TEST_QUEUE) chan.start_consuming()if __name__ == "__main__": test_main()

運(yùn)行一段時(shí)間后, 就會(huì)報(bào)錯(cuò):

[ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Error event 25, None[CRITICAL][pika.adapters.base_connection][2017-08-18 12:33:49]Tried to handle an error where no error existed[ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Fatal Socket Error: BrokenPipeError(32, 'Broken pipe')

問題排查

猜測:pika客戶端沒有及時(shí)發(fā)送心跳,連接被server斷開

一開始修改了heartbeat_interval參數(shù)值, 示例如下:

def test_main(): s_conn = pika.BlockingConnection(  pika.ConnectionParameters('127.0.0.1',    heartbeat_interval=10,   socket_timeout=5,   credentials=pika.PlainCredentials(USER, PWD))) # ....

修改后運(yùn)行依然報(bào)錯(cuò),后來想想應(yīng)該單線程被一直占用,pika無法發(fā)送心跳;

于是又加了個(gè)心跳線程, 示例如下:

#!/usr/bin#coding: utf-8import pikaimport timeimport loggingimport threadingUSER = 'guest'PWD = 'guest'TEST_QUEUE = 'just4test'class Heartbeat(threading.Thread): def __init__(self, connection):  super(Heartbeat, self).__init__()  self.lock = threading.Lock()  self.connection = connection  self.quitflag = False  self.stopflag = True  self.setDaemon(True) def run(self):  while not self.quitflag:   time.sleep(10)   self.lock.acquire()   if self.stopflag :    self.lock.release()    continue   try:    self.connection.process_data_events()   except Exception as ex:    logging.warn("Error format: %s"%(str(ex)))    self.lock.release()    return   self.lock.release() def startHeartbeat(self):  self.lock.acquire()  if self.quitflag==True:   self.lock.release()   return  self.stopflag=False  self.lock.release()def callback(ch, method, properties, body): logging.info("recv_body:%s" % body) time.sleep(600) ch.basic_ack(delivery_tag = method.delivery_tag)def test_main(): s_conn = pika.BlockingConnection(  pika.ConnectionParameters('127.0.0.1',    heartbeat_interval=10,   socket_timeout=5,   credentials=pika.PlainCredentials(USER, PWD))) chan = s_conn.channel() chan.queue_declare(queue=TEST_QUEUE) chan.basic_consume(callback,      queue=TEST_QUEUE) heartbeat = Heartbeat(s_conn) heartbeat.start()   #開啟心跳線程 heartbeat.startHeartbeat() chan.start_consuming()if __name__ == "__main__": test_main()            
發(fā)表評論 共有條評論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 望奎县| 庄河市| 姜堰市| 江山市| 勐海县| 鱼台县| 桓仁| 陆川县| 吴旗县| 建昌县| 河南省| 霍邱县| 伊金霍洛旗| 甘泉县| 婺源县| 高平市| 安仁县| 太仆寺旗| 西宁市| 济南市| 元朗区| 潜江市| 金川县| 鲁甸县| 共和县| 固原市| 萨迦县| 兴仁县| 海南省| 古田县| 丰镇市| 霍城县| 津市市| 怀安县| 城口县| 太仓市| 吴川市| 浦北县| 柏乡县| 灯塔市| 进贤县|