我就廢話不多說了,直接上代碼吧!
""" 對于每天存儲文件,文件數量過多,占用空間 采用保存最新的三個文件 """ from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.models import Variable from sctetl.airflow.utils import dateutils from datetime import datetime,timedelta import logging import os import shutil """ base_dir = "/data" data_dir = "/gather" "gather下邊存在不同的文件夾" "/data/gather/test" "test路徑下有以下文件夾" "20180812、20180813、20180814、20180815、20180816" """ base_dir = Variable.get("base_dir") data_dir = Variable.get("data_dir") keep = 3 default_arg = { "owner":"airflow", "depends_on_past":False, "start_date":dateutils.get_start_date_local(2018,8,27,18,5), "email":[''], "email_on_failure":False, "email_on_retry":False, "retries":1, "retry_delay":timedelta(minutes=5) } dag = DAG(dag_id="keep_three_day",default_args=default_arg,schedule_interval=dateutils.get_schedule_interval_local(18,5)) def keep_three_day(): path = os.path.join(base_dir, data_dir) date_cates = os.listdir(path) for cate in date_cates: p = os.path.join(base_dir, data_dir, cate) if os.path.isdir(p): dir_names = os.listdir(p) dir_names.sort() for i in dir_names[:-keep]: logging.info("刪除目錄 {path}".format(path=os.path.join(p, i))) shutil.rmtree(os.path.join(p, i)) with dag: keep_three_file = PythonOperator(task_id="keep_three_file",python_callable=keep_three_day(),dag=dag) keep_three_file
名稱欄目:python實現保存最新的三份文件,其余的都刪掉-創新互聯
網站網址:http://vcdvsql.cn/article18/ddjigp.html
成都網站建設公司_創新互聯,為您提供微信小程序、外貿建站、響應式網站、域名注冊、網站制作、標簽優化
聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯