关闭

财富坊cff888: Flask和Celery的使用

标签: celeryflask
1979人阅读 评论(3) 收藏 举报
分类:

显示更新状态和结果

事实上对于很多应用来说,有必要监控它的后台任务并且从中获取结果。
我们用一个例子,一个虚构的耗时任务来扩展上面的应用,用户可以通过点击一个按钮启动一个或更多这些长时间运行任务。运行在你的浏览器上的网页通过ajax轮训你的服务获取这些任务的状态更新。对于每个任务,网页会展示图形状态栏,一个完成百分比,一个状态消息,当任务完成时,结果值会被展示。

有状态更新的后台任务

下面是这个例子中使用的后台任务:

@celery.task(bind=True)
def long_task(self):
    """Background task that runs a long function with progress reports."""
    verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
    adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
    noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
    message = ''
    total = random.randint(10, 50)
    for i in range(total):
        if not message or random.random() < 0.25:
            message = '{0} {1} {2}...'.format(random.choice(verb),
                                              random.choice(adjective),
                                              random.choice(noun))
        self.update_state(state='PROGRESS',
                          meta={'current': i, 'total': total,
                                'status': message})
        time.sleep(1)
    return {'current': 100, 'total': 100, 'status': 'Task completed!',
            'result': 42}

这个路由生成一个JSON响应,它包含任务状态和我在update_state()调用中作为元参数设置的所有值,客户端可以用它构建一个进程条。不幸的是这个函数需要检查一些边界条件,所以最后有点长。为了检查任务数据我再创了一个任务对象,一个AsyncResult类的实例,使用URL中给定的任务id。
首先 if 块是当任务还没有开始(悬起状态)。这种情况下没有状态信息,所以我制造一些数据。跟着 elif 块从后台任务中返回状态信息。这里由任务提供的信息作为 task.info 是可访问的。如果数据包含了一个结果键,就意味着这是最终结果并且任务结束,所以我把结果也添加到 responseelse 块覆盖了一个错误的可能性,Celery 会通过设置任务状态为“FAILURE”来报告,这种情况下 task.info 会包含被引发的异常,为了处理错误我设置了异常文本作为一个状态消息。
以上都是发生在服务器端,剩下的需要客户端实现,在本例中就是一个包含JavaScript脚本的网页

客户端 Javascript

解析本例的JavaScript部分不是本文真正的关注点,但如果你感兴趣的话,下面有一些信息。
图形进度条部分我使用了nanobar.js,通过CDN包含进来。同时也包含了jquery,简化了ajax调用:

<script src="//cdnjs.cloudflare.com/ajax/libs/nanobar/0.2.1/nanobar.min.js"></script>
<script src="//cdnjs.cloudflare.com/ajax/libs/jquery/2.1.3/jquery.min.js"></script>

启动后台工作的按钮被关联到下面的Javascript上:

function start_long_task() {
        // add task status elements 
        div = $('<div class="progress"><div></div><div>0%</div><div>...</div><div>&nbsp;</div></div><hr>');
        $('#progress').append(div);

        // create a progress bar
        var nanobar = new Nanobar({
            bg: '#44f',
            target: div[0].childNodes[0]
        });

        // send ajax POST request to start background job
        $.ajax({
            type: 'POST',
            url: '/longtask',
            success: function(data, status, request) {
                status_url = request.getResponseHeader('Location');
                update_progress(status_url, nanobar, div[0]);
            },
            error: function() {
                alert('Unexpected error');
            }
        });
    }

这个函数起始于添加一些用来展示新的后台任务进度条和状态的HTML元素,这是动态的因为用户可以添加任何数量任务,每个任务需要获取它自己的HTML元素集合。

为了帮助大家更好的理解,下面提供了给任务添加的元素的结构,注释指出了每个div的用处:

<div class="progress">
    <div></div>         <-- 进度条
    <div>0%</div>       <-- 百分比
    <div>...</div>      <-- 状态消息
    <div>&nbsp;</div>   <-- 结果
</div>
<hr>

然后 start_long_task()函数按照nanobar的文档实例化这个进度条 ,最后发送ajax POST请求到 /longtask,在服务端初始化Celery 后台任务。

当 POST ajax 调用返回, 回调函数包含了Location header的值,这就像你在前面部分看到的一样是为了客户端状态更新。然后使用这个状态URL, 进度条对象,为任务创建的根div的子树,调用另一个函数update_progress() 。下面你可以看到这个update_progress() 函数,它发送状态请求然后用它返回的信息更新UI元素:

  function update_progress(status_url, nanobar, status_div) {
        // send GET request to status URL
        $.getJSON(status_url, function(data) {
            // update UI
            percent = parseInt(data['current'] * 100 / data['total']);
            nanobar.go(percent);
            $(status_div.childNodes[1]).text(percent + '%');
            $(status_div.childNodes[2]).text(data['status']);
            if (data['state'] != 'PENDING' && data['state'] != 'PROGRESS') {
                if ('result' in data) {
                    // show result
                    $(status_div.childNodes[3]).text('Result: ' + data['result']);
                }
                else {
                    // something unexpected happened
                    $(status_div.childNodes[3]).text('Result: ' + data['state']);
                }
            }
            else {
                // rerun in 2 seconds
                setTimeout(function() {
                    update_progress(status_url, nanobar, status_div);
                }, 2000);
            }
        });
    }

这个函数发送GET请求到状态URL,当一个响应被接收后它为任务更新不同的HTML元素。如果后台任务结束并且结果可用那么它就被添加到页面上。如果没有结果就意味着任务由于错误而结束,所以任务的状态,将会是FAILURE,就像结果所展示的。

当服务端正在执行任务我需要继续轮训任务状态并且更新UI。为实现这个我设置了一个定时器在两秒内来再次调用这个函数。这持续到Celery任务结束。

一个worker运行尽可能多的并发任务,按照默认CPU数。所以当你实验这个例子时确保开启大量任务来查看Celery如何保持任务为挂起状态,直到有worker能够处理它。

运行这个例子

一切准备就绪你就可以运行这个例子了。可以从Github仓库克隆代码,创建一个虚拟环境,激活并安装依赖:

$ git clone https://github.com/miguelgrinberg/flask-celery-example.git
$ cd flask-celery-example
$ virtualenv venv
$ source venv/bin/activate
(venv) $ pip install -r requirements.txt

这个仓库里的requirements.txt 文件包含Flask, Flask-Mail, Celery 和 Redis 客户端, 还有他们的依赖.

Now you need to run the three processes required by this application, 最简单的方式是打开三个终端窗口。第一个终端运行Redis。你可以根据下载说明给你的操作系统安装Redis,但如果用的是linux或者OS X机器,我已经包含了一个小脚本,可以下载,编译和作为私有服务运行Redis:

$ ./run-redis.sh

对于以上脚本你需要安装了gcc。注意以上的命令是阻塞的,Redis会在前台启动。
在第二个终端运行一个Celery工人。这个使用celery的命令,这个On the second terminal run a Celery worker. This is done with the celery command, which is installed in your virtual environment. Since this is the process that will be sending out emails, the MAIL_USERNAME and MAIL_PASSWORD environment variables must be set to a valid Gmail account before starting the worker:

$ export MAIL_USERNAME=<your-gmail-username>
$ export MAIL_PASSWORD=<your-gmail-password>
$ source venv/bin/activate
(venv) $ celery worker -A app.celery --loglevel=info

The -A option gives Celery the application module and the Celery instance, and –loglevel=info makes the logging more verbose, which can sometimes be useful in diagnosing problems.

Finally, on the third terminal window run the Flask application, also from the virtual environment:

$ source venv/bin/activate
(venv) $ python app.py

Now you can navigate to http://localhost:5000/ in your web browser and try the examples!

1
0
查看评论

在 Flask 中使用 Celery

在 Flask 中使用 Celery 后台运行任务的话题是有些复杂,因为围绕这个话题会让人产生困惑。为了简单起见,在以前我所有的例子中,我都是在线程中执行后台任务,但是我一直注意到更具有扩展性以及具备生产解决方案的任务队列像 Celery 应该可以替代线程中执行后台任务。 ...
  • sedrtse
  • sedrtse
  • 2016-12-11 11:07
  • 657

用 Flask 来写个轻博客 (26) — 使用 Flask-Celery-Helper 实现异步任务

目录目录 前文列表 扩展阅读 Celery 将 Celery 加入到应用中 实现向新用户发送欢迎邮件 前文列表用 Flask 来写个轻博客 (1) — 创建项目 用 Flask 来写个轻博客 (2) — Hello World! 用 Flask 来写个轻博客 (3) — (M)VC_连接 MyS...
  • Jmilk
  • Jmilk
  • 2016-12-15 21:48
  • 4946

flask+celery常见问题及解决方法

1、 [root@ansible flask_celery]# celery -A app worker --loglevel=info Traceback (most recent call last):   File "/usr/bin/celery", li...
  • lixingdefengzi
  • lixingdefengzi
  • 2016-06-27 17:15
  • 3604

Celery定时任务

Celery定时任务配置启用Celery的定时任务需要设置CELERYBEAT_SCHEDULE 。 Celery的定时任务都由celery beat来进行调度。celery beat默认按照settings.py之中的时区时间来调度定时任务。创建定时任务一种创建定时任务的方式是配置CELER...
  • sicofield
  • sicofield
  • 2016-03-20 16:59
  • 13334

celery 简单应用及 redis的安装和启动

1.安装redis ①wget http://download.redis.io/releases/redis-3.0.3.tar.gz ②tar xzf redis-3.0.3.tar.gz ③在redis-3.0.3目录下make,后会生成src目录,执行src下面的redis-server即可...
  • u014305812
  • u014305812
  • 2016-08-23 15:02
  • 769

Flask Web 开发 发送异步邮件

继续上一章节的内容,当你点击submit的同时,他会发一封邮件给管理员 但是,他会有短时间几秒钟的无响应,如何避免这样的情况呢? 那就要用到异步邮件 实际上就是交由后来发送这个邮件,算到其他线程里面 代码改动如下 先导入Thread模块 from threading import...
  • bestallen
  • bestallen
  • 2016-08-14 16:20
  • 1102

使用Celery

Celery是一个专注于实时处理和任务调度的分布式任务队列。所谓任务就是消息,消息中的有效载荷中包含要执行任务需要的全部数据。 使用Celery的常见场景如下: 1. Web应用。当用户触发的一个操作需要较长时间才能执行完成时,可以把它作为任务交给Celery去异步执行,执行完再返回给用户。这段...
  • dongwm1
  • dongwm1
  • 2016-09-08 15:42
  • 1188

celery、rabbitmq的使用

最近同事项目想使用celery与rabbitmq来做任务调度,让我做一次这方面的使用分享。工作之余大致整理了一下。 一、先介绍一下rabbitmq消息队列 ?AMQP,即AdvancedMessage Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的...
  • nxgych
  • nxgych
  • 2016-11-11 14:23
  • 1574

Celery和Flask的结合(在Blueprint控制下的部署)

方便他人,也为了记录知识备注:前提是已经按照好了celery,为什么不用RabbitMQ呢? 因为我用的是linux,发现RabbitMQ并不是python语言编写的,安装十分麻烦(个人感觉) #接下来是正文# 首先是项目的目录预览(只列出来了相关的模块) Proj/ &#...
  • wr166
  • wr166
  • 2017-12-05 11:07
  • 98

flask使用celery定时器

celery
  • q527641488
  • q527641488
  • 2017-06-15 15:38
  • 551