目錄:
成都創新互聯公司是一家專業提供海興企業網站建設,專注與成都做網站、網站建設、H5高端網站建設、小程序制作等業務。10年已為海興眾多企業、政府機構等服務。創新互聯專業網站制作公司優惠進行中。
1.?Django集成Celery
2.?聲明異步任務
3.?封裝工具task_util.py
4.?單元測試test_task_util.py
5.?創建異步任務
6.?常見問題和解決方法
Celery是一個靈活可靠的分布式系統,用于異步任務調度,主要有3部分組成:消息中間件broker,任務執行單元worker,執行結果存儲task result store。Celery使用第三方消息中間件redis,RabbitMQ等。
?
系統通常將一些耗時的操作任務提交給Celery去異步執行,典型架構示意圖如下。本文詳細介紹Django集成Celery配置方法和功能測試。
時序圖如下:
?
示例代碼:https://github.com/rickding/HelloPython/tree/master/hello_celery
├──__init__.py
├── settings.py
├── celery.py
├── tasks.py
├── util
│ ??└── task_util.py
├── tests
│ ??└── test_task_util.py
一,Django集成Celery
代碼文件 | 功能要點 | |
Django集成Celery | requirements.txt | 安裝Celery, Redis和工具包: celery == 4.2.1 flower == 0.9.2 redis == 3.2.0 eventlet == 0.24.1 |
celery.py | 配置Celery,依賴的消息中間件broker和后端backend地址配置在settings.py中集中維護。 | |
__init__.py | 配置項目加載celery.app | |
聲明異步任務 | tasks.py | 聲明Celery可調度的任務@shared_task |
封裝工具task_util | task_util.py | 異步任務創建和分發 |
單元測試 | test_task_util.py | 測試異步任務創建和分發功能 |
創建異步任務 | views.py | 增加REST接口/chk/job |
1.?新建Django項目,運行:django-admin startproject hello_celery
2.?進到目錄hello_celery,增加應用:python manage.py startapp app
項目的目錄文件結構如下:
3.?安裝Celery和依賴包,pip install celery >= 4.2.1,如果不是新建項目,注意版本兼容問題。
celery == 4.2.1
flower == 0.9.2
redis == 3.2.0
eventlet == 0.24.1
4.?增加celery.py,配置信息:
from__future__importabsolute_import,unicode_literals
importos
fromceleryimportCelery,platforms
fromdjango.confimportsettings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE','hello_celery.settings')
app = Celery(
????'hello_celery',
????include=['hello_celery.tasks'],
????broker=settings.CELERY_BROKER,
????backend=settings.CELERY_BACKEND
)
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# ??should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings',namespace='CELERY')
app.conf.update(
????CELERY_ACKS_LATE=True,
????CELERY_ACCEPT_CONTENT=['pickle','json'],
????CELERYD_FORCE_EXECV=True,
????CELERYD_MAX_TASKS_PER_CHILD=500,
????BROKER_HEARTBEAT=0,
)
# Optional configuration, see the application user guide.
app.conf.update(
????CELERY_TASK_RESULT_EXPIRES=3600, ?# celery任務執行結果的超時時間,即結果在backend里的保存時間,單位s
)
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
platforms.C_FORCE_ROOT =True
5.?打開settings.py,配置BROKER和BACKEND地址:
CELERY_BROKER ='redis://127.0.0.1:6379/2'
CELERY_BACKEND ='redis://127.0.0.1:6379/3'
6.?打開__init__.py,增加代碼:
from__future__importabsolute_import,unicode_literals
from.celeryimportappascelery_app
__all__ = ['celery_app']
二,增加tasks.py,聲明異步任務
from__future__importabsolute_import,unicode_literals
importlogging
importjson
fromceleryimportshared_task
fromhello_celery.util.task_utilimportdispatch_task
log = logging.getLogger(__name__)
@shared_task
deftask(param_str):
????log.info('task starts: %s, %s'% (type(param_str),param_str))
????param_dict =None
????try:
????????param_dict = json.loads(param_str)
????exceptExceptionase:
????????log.warning('Exception when parse param: %s'%str(e))
????log.info('parsed param: {}, {}'.format(type(param_dict),param_dict))
????return'finished'
正確配置后,運行命令:celery -A hello_celery worker -l info -P eventlet,注意Win10環境中需要增加eventlet,Celery成功啟動信息:
?
三,封裝工具task_util.py
封裝兩個有用的工具函數,分別用于分發(創建)異步任務和獲取任務信息:
importlogging
importjson
log = logging.getLogger(__name__)
defdispatch_task(task_func,param_dict):
????param_json = json.dumps(param_dict)
????try:
????????returntask_func.apply_async(
????????????[param_json],
????????????retry=True,
????????????retry_policy={
????????????????'max_retries':1,
????????????????'interval_start':0,
????????????????'interval_step':0.2,
????????????????'interval_max':0.2,
????????????},
????????)
????exceptExceptionasex:
????????log.info(ex)
????????raise
defget_task_status(task_func,task_id):
????t = task_func.AsyncResult(task_id)
????status = t.state
????progress =0
????ifstatus ==u'SUCCESS':
????????progress =100
????elifstatus ==u'FAILURE':
????????progress =0
????elifstatus =='PROGRESS':
????????progress = t.info['progress']
????return{'status': status,'progress': progress}
四,單元測試test_task_util.py
增加測試函數,創建一個任務任務并獲取信息:
importlogging
fromdjango.testimportTestCase
fromhello_celery.tasksimporttask
fromhello_celery.util.task_utilimportdispatch_task,get_task_status
log = logging.getLogger(__name__)
classTasksTest(TestCase):
????deftest_get_task_status(self):
????????t = dispatch_task(task,{'msg':'test_task'})
????????self.assertIsNotNone(t)
????????ret = get_task_status(task,t.id)
????????log.info('task status: %s,%s, %s'% (ret,t.id,str(task)))
????????self.assertIsNotNone(ret.get('status'))
運行python manage.py test,同時Celery將執行測試函數創建的任務:
五,創建異步任務
1.?在views.py中增加請求處理函數,創建一個異步執行的任務:
fromdjango.httpimportJsonResponse
fromhello_celery.tasksimportdo_task
defchk_job(req):
????param_dict = {
????????'url': req.get_raw_uri(),
????????'path': req.get_full_path(),
????}
????job = do_task(param_dict)
????returnJsonResponse({'code':0,'msg':'success','job': job.task_id})
2.?在urls.py中配置路由
fromdjango.urlsimportpath
fromapp.viewsimportchk_job
urlpatterns = [
????path('',chk_job,name='chk'),
]
3.?運行命令啟動服務:python manage.py runserver 0.0.0.0:8001
4.?REST接口創建異步任務示例
?
六,常見問題和解決方法
1.?啟動Celery:celery -A hello_celery worker -l info,運行出錯:
Unrecoverable error: VersionMismatch('Redis transport requires redis-py versions 3.2.0 or later. You have 2.10.6',)
解決:指定Redis使用3.2.0或更高pip install redis>=3.2.0
當前名稱:【從0開始Python開發實戰】Django集成Celery
分享URL:http://vcdvsql.cn/article44/gdiohe.html
成都網站建設公司_創新互聯,為您提供響應式網站、虛擬主機、定制開發、搜索引擎優化、ChatGPT、定制網站
聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯