文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

基于redis乐观锁怎么实现并发排队

2023-07-04 20:57

关注

这篇“基于redis乐观锁怎么实现并发排队”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“基于redis乐观锁怎么实现并发排队”文章吧。

有个需求场景是这样的,使用redis控制scrapy运行的数量。当系统的后台设置为4时,只允许scapry启动4个任务,多余的任务则进行排队。

概况

最近做了一个django + scrapy + celery + redis 的爬虫系统,客户购买的主机除了跑其他程序外,还要跑我开发的这套程序,所以需要手动控制scrapy的实例数量,避免过多的爬虫给系统造成负担。

流程设计

爬虫任务由用户以请求的方式发起,所有的用户的请求统一进入到celery进行排队;
2、任务数量控制的执行就交给reids,经由celery保存到redis,包含了爬虫启动所需要的必要信息,从redis取一条信息即可启动一个爬虫;
3、通过scrapyd的接口来获取当前在运行的爬虫数量,以便决定下一步流程:如果小于4,则从redis中取相应数量的信息来启动爬虫,如果大于等于4,则继续等待;
4、如果在运行爬虫的数量有所减少,则及时从reids中取相应数量的信息来启动爬虫。

代码实现

业务代码有点复杂和啰嗦,此处使用伪代码来演示

import redis# 实例化一个redis连接池pool = redis.ConnectionPool(host='127.0.0.1', port=6379, decode_responses=True, db=4, password='')r = redis.Redis(connection_pool=pool)# 爬虫实例限制为4 即只允许4个scrapy实例在运行limited = 4# 声明redis的乐观锁lock = r.Lock()# lock.acquire中有while循环,即它会线程阻塞,直到当前线程获得redis的lock,才会继续往下执行代码if lock.acquire():# 1、从reids中取一条爬虫信息info = redis.get() # 2、while循环监听爬虫运行的数量while True:req = requests.get('http://127.0.0.1:6800/daemonstatus.json').json()# 统计当前有多少个爬虫在运行running = req.get('running') + req.get('pending')# 3、判断是否等待还是要增加爬虫数量# 3.1 如果在运行的数量大于等于设置到量 则继续等待if running >= limited:continue# 3.2 如果小于 则启动爬虫start_scrapy(info)# 3.3 将info从redis中删除redis.delete(info)# 3.4 释放锁lock.release()break

当前,这只是伪代码而已,实际的业务逻辑可能是非常复杂的,如:

@shared_taskdef scrapy_control(key_uuid):    r = redis.Redis(connection_pool=pool)    db = MysqlDB()    speed_limited = db.fetch_config('REPTILE_SPEED')    speed_limited = int(speed_limited[0])    keywords_num = MysqlDB().fetch_config('SEARCH_RANDOM')    keywords_num = int(keywords_num[0])    # while True:    lock = r.lock('lock')    with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 进入处理环节' +  '\n')    try:        # acquire默认阻塞 如果获取不到锁时 会一直阻塞在这个函数的while循环中        if lock.acquire():            with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 获得锁' +  '\n')            # 1 从redis中获取信息            redis_obj = json.loads(r.get(key_uuid))            user_id = redis_obj.get('user_id')            contents = redis_obj.get('contents')                        # 2 使用while循环处理核心逻辑                      is_hold_print = True            while True:                req = requests.get('http://127.0.0.1:6800/daemonstatus.json').json()                running = req.get('running') + req.get('pending')                # 3 如果仍然有足够的爬虫在运行 则hold住redis锁,等待有空余的爬虫位置让出                if running >= speed_limited:                    if is_hold_print:                        with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 爬虫在运行,线程等待中' +  '\n')                        is_hold_print = False                    time.sleep(1)                    continue                                # 4 有空余的爬虫位置 则往下走                # 4.1 处理完所有的内容后 释放锁                if len(contents) == 0:                    r.delete(key_uuid)                    with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 任务已完成,从redis中删除' +  '\n')                    lock.release()                    with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 释放锁' +  '\n')                    break                # 4.2 创建task任务                task_uuid = str(uuid.uuid4())                article_obj = contents.pop()                article_id = article_obj.get('article_id')                article = article_obj.get('content')                try:                    Task.objects.create(                        task_uuid = task_uuid,                        user_id = user_id,                        article_id = article_id,                        content = article                    )                except Exception as e:                    with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + '->' + str(task_uuid) + ' 创建Task出错: ' + str(e) +  '\n')                # finally:                # 4.3 启动爬虫任务 即便创建task失败也会启动                try:                    task_chain(user_id, article, task_uuid, keywords_num)                except Exception as e:                    with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 启动任务链失败: ' + str(e) +  '\n')                                # 加入sleep 防止代码执行速度快于爬虫启动速度而导致当前线程启动额外的爬虫                time.sleep(5)    except Exception as e:        with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 获得锁之后的操作出错: ' + str(e) +  '\n')        lock.release()

小坑
scrapy启动速度相对较慢,所以while循环中,代码中执行到了爬虫的启动,需要sleep一下再去通过scrapyd接口获取爬虫运行的数量,如果立刻读取,可能会造成误判。

以上就是关于“基于redis乐观锁怎么实现并发排队”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注编程网行业资讯频道。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯