本篇內(nèi)容介紹了“如何使用Mars Remote API執(zhí)行Python函數(shù)”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
為蚌山等地區(qū)用戶(hù)提供了全套網(wǎng)頁(yè)設(shè)計(jì)制作服務(wù),及蚌山網(wǎng)站建設(shè)行業(yè)解決方案。主營(yíng)業(yè)務(wù)為成都網(wǎng)站設(shè)計(jì)、做網(wǎng)站、蚌山網(wǎng)站設(shè)計(jì),以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專(zhuān)業(yè)、用心的態(tài)度為用戶(hù)提供真誠(chéng)的服務(wù)。我們深信只要達(dá)到每一位用戶(hù)的要求,就會(huì)得到認(rèn)可,從而選擇與我們長(zhǎng)期合作。這樣,我們也可以走得更遠(yuǎn)!
Mars 是一個(gè)并行和分布式 Python 框架,能輕松把單機(jī)大家耳熟能詳?shù)牡?numpy、pandas、scikit-learn 等庫(kù),以及 Python 函數(shù)利用多核或者多機(jī)加速。這其中,并行和分布式 Python 函數(shù)主要利用 Mars Remote API。
啟動(dòng) Mars 分布式環(huán)境可以參考:
命令行方式在集群中部署。
Kubernetes 中部署。
MaxCompute 開(kāi)箱即用的環(huán)境,購(gòu)買(mǎi)了 MaxCompute 服務(wù)的可以直接使用。
使用 Mars Remote API 非常簡(jiǎn)單,只需要對(duì)原有的代碼做少許改動(dòng),就可以分布式執(zhí)行。
拿用蒙特卡洛方法計(jì)算 π 為例。代碼如下,我們編寫(xiě)了兩個(gè)函數(shù),calc_chunk
用來(lái)計(jì)算每個(gè)分片內(nèi)落在圓內(nèi)的點(diǎn)的個(gè)數(shù),calc_pi
用來(lái)把多個(gè)分片 calc_chunk
計(jì)算的結(jié)果匯總最后得出 π 值。
from typing import Listimport numpy as npdef calc_chunk(n: int, i: int):# 計(jì)算n個(gè)隨機(jī)點(diǎn)(x和y軸落在-1到1之間)到原點(diǎn)距離小于1的點(diǎn)的個(gè)數(shù)rs = np.random.RandomState(i) a = rs.uniform(-1, 1, size=(n, 2)) d = np.linalg.norm(a, axis=1)return (d < 1).sum()def calc_pi(fs: List[int], N: int):# 將若干次 calc_chunk 計(jì)算的結(jié)果匯總,計(jì)算 pi 的值return sum(fs) * 4 / N N = 200_000_000 n = 10_000_000 fs = [calc_chunk(n, i) for i in range(N // n)] pi = calc_pi(fs, N) print(pi)
%%time
下可以看到結(jié)果:
3.1416312 CPU times: user 9.47 s, sys: 2.62 s, total: 12.1 s Wall time: 12.3 s
在單機(jī)需要 12.3 s。
要讓這個(gè)計(jì)算使用 Mars Remote API 并行起來(lái),我們不需要對(duì)函數(shù)做任何改動(dòng),需要變動(dòng)的僅僅是最后部分。
import mars.remote as mr# 函數(shù)調(diào)用改成 mars.remote.spawnfs = [mr.spawn(calc_chunk, args=(n, i)) for i in range(N // n)]# 把 spawn 的列表傳入作為參數(shù),再 spawn 新的函數(shù)pi = mr.spawn(calc_pi, args=(fs, N))# 通過(guò) execute() 觸發(fā)執(zhí)行,fetch() 獲取結(jié)果print(pi.execute().fetch())
%%time
下看到結(jié)果:
3.1416312 CPU times: user 29.6 ms, sys: 4.23 ms, total: 33.8 ms Wall time: 2.85 s
結(jié)果一模一樣,但是卻有數(shù)倍的性能提升。
可以看到,對(duì)已有的 Python 代碼,Mars remote API 幾乎不需要做多少改動(dòng),就能有效并行和分布式來(lái)加速執(zhí)行過(guò)程。
為了讓讀者理解 Mars Remote API 的作用,我們從另一個(gè)例子開(kāi)始?,F(xiàn)在我們有一個(gè)數(shù)據(jù)集,我們希望對(duì)它們做一個(gè)分類(lèi)任務(wù)。要做分類(lèi),我們有很多算法和庫(kù)可以選擇,這里我們用 RandomForest、LogisticRegression,以及 XGBoost。
困難的地方是,除了有多個(gè)模型選擇,這些模型也會(huì)包含多個(gè)超參,那哪個(gè)超參效果最好呢?對(duì)于調(diào)參不那么有經(jīng)驗(yàn)的同學(xué),跑過(guò)了才知道。所以,我們希望能生成一堆可選的超參,然后把他們都跑一遍,看看效果。
這個(gè)例子里我們使用 otto 數(shù)據(jù)集。
首先,我們準(zhǔn)備數(shù)據(jù)。讀取數(shù)據(jù)后,我們按 2:1 的比例把數(shù)據(jù)分成訓(xùn)練集和測(cè)試集。
import pandas as pdfrom sklearn.preprocessing import LabelEncoderfrom sklearn.model_selection import train_test_splitdef gen_data():df = pd.read_csv('otto/train.csv') X = df.drop(['target', 'id'], axis=1) y = df['target'] label_encoder = LabelEncoder() label_encoder.fit(y) y = label_encoder.transform(y) return train_test_split(X, y, test_size=0.33, random_state=123) X_train, X_test, y_train, y_test = gen_data()
接著,我們使用 scikit-learn 的 RandomForest 和 LogisticRegression 來(lái)處理分類(lèi)。
RandomForest:
from sklearn.ensemble import RandomForestClassifierdef random_forest(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool = False, **kw):model = RandomForestClassifier(verbose=verbose, **kw) model.fit(X_train, y_train)return model
接著,我們生成供 RandomForest 使用的超參,我們用 yield 的方式來(lái)迭代返回。
def gen_random_forest_parameters():for n_estimators in [50, 100, 600]:for max_depth in [None, 3, 15]:for criterion in ['gini', 'entropy']:yield {'n_estimators': n_estimators,'max_depth': max_depth,'criterion': criterion }
LogisticRegression 也是這個(gè)過(guò)程。我們先定義模型。
from sklearn.linear_model import LogisticRegressiondef logistic_regression(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool = False, **kw):model = LogisticRegression(verbose=verbose, **kw) model.fit(X_train, y_train)return model
接著生成供 LogisticRegression 使用的超參。
def gen_lr_parameters():for penalty in ['l2', 'none']:for tol in [0.1, 0.01, 1e-4]:yield {'penalty': penalty,'tol': tol }
XGBoost 也是一樣,我們用 XGBClassifier
來(lái)執(zhí)行分類(lèi)任務(wù)。
from xgboost import XGBClassifierdef xgb(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool = False, **kw):model = XGBClassifier(verbosity=int(verbose), **kw) model.fit(X_train, y_train)return model
生成一系列超參。
def gen_xgb_parameters():for n_estimators in [100, 600]:for criterion in ['gini', 'entropy']:for learning_rate in [0.001, 0.1, 0.5]:yield {'n_estimators': n_estimators,'criterion': criterion,'learning_rate': learning_rate }
接著我們編寫(xiě)驗(yàn)證邏輯,這里我們使用 log_loss
來(lái)作為評(píng)價(jià)函數(shù)。
from sklearn.metrics import log_lossdef metric_model(model, X_test: pd.DataFrame, y_test: pd.Series) -> float:if isinstance(model, bytes): model = pickle.loads(model) y_pred = model.predict_proba(X_test)return log_loss(y_test, y_pred)def train_and_metric(train_func, train_params: dict, X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame, y_test: pd.Series, verbose: bool = False ):# 把訓(xùn)練和驗(yàn)證封裝到一起model = train_func(X_train, y_train, verbose=verbose, **train_params) metric = metric_model(model, X_test, y_test)return model, metric
做好準(zhǔn)備工作后,我們就開(kāi)始來(lái)跑模型了。針對(duì)每個(gè)模型,我們把每次生成的超參們送進(jìn)去訓(xùn)練,除了這些超參,我們還把 n_jobs
設(shè)成 -1,這樣能更好利用單機(jī)的多核。
results = []# -------------# Random Forest# -------------for params in gen_random_forest_parameters(): print(f'calculating on {params}')# fixed random_stateparams['random_state'] = 123# use all CPU coresparams['n_jobs'] = -1model, metric = train_and_metric(random_forest, params, X_train, y_train, X_test, y_test) print(f'metric: {metric}') results.append({'model': model, 'metric': metric}) # -------------------# Logistic Regression# -------------------for params in gen_lr_parameters(): print(f'calculating on {params}')# fixed random_stateparams['random_state'] = 123# use all CPU coresparams['n_jobs'] = -1model, metric = train_and_metric(logistic_regression, params, X_train, y_train, X_test, y_test) print(f'metric: {metric}') results.append({'model': model, 'metric': metric}) # -------# XGBoost# -------for params in gen_xgb_parameters(): print(f'calculating on {params}')# fixed random_stateparams['random_state'] = 123# use all CPU coresparams['n_jobs'] = -1model, metric = train_and_metric(xgb, params, X_train, y_train, X_test, y_test) print(f'metric: {metric}') results.append({'model': model, 'metric': metric})
運(yùn)行一下,需要相當(dāng)長(zhǎng)時(shí)間,我們省略掉一部分輸出內(nèi)容。
calculating on {'n_estimators': 50, 'max_depth': None, 'criterion': 'gini'} metric: 0.6964123781828575calculating on {'n_estimators': 50, 'max_depth': None, 'criterion': 'entropy'} metric: 0.6912312790832288# 省略其他模型的輸出結(jié)果CPU times: user 3h 41min 53s, sys: 2min 34s, total: 3h 44min 28s Wall time: 31min 44s
從 CPU 時(shí)間和 Wall 時(shí)間,能看出來(lái)這些訓(xùn)練還是充分利用了多核的性能。但整個(gè)過(guò)程還是花費(fèi)了 31 分鐘。
現(xiàn)在我們嘗試使用 Remote API 通過(guò)分布式方式加速整個(gè)過(guò)程。
集群方面,我們使用最開(kāi)始說(shuō)的第三種方式,直接在 MaxCompute 上拉起一個(gè)集群。大家可以選擇其他方式,效果是一樣的。
n_cores = 8mem = 2 * n_cores # 16G# o 是 MaxCompute 入口,這里創(chuàng)建 10 個(gè) worker 的集群,每個(gè) worker 8核16Gcluster = o.create_mars_cluster(10, n_cores, mem, image='extended')
為了方便在分布式讀取數(shù)據(jù),我們對(duì)數(shù)據(jù)處理稍作改動(dòng),把數(shù)據(jù)上傳到 MaxCompute 資源。對(duì)于其他環(huán)境,用戶(hù)可以考慮 HDFS、Aliyun OSS 或者 Amazon S3 等存儲(chǔ)。
if not o.exist_resource('otto_train.csv'):with open('otto/train.csv') as f:# 上傳資源o.create_resource('otto_train.csv', 'file', fileobj=f) def gen_data():# 改成從資源讀取df = pd.read_csv(o.open_resource('otto_train.csv')) X = df.drop(['target', 'id'], axis=1) y = df['target'] label_encoder = LabelEncoder() label_encoder.fit(y) y = label_encoder.transform(y) return train_test_split(X, y, test_size=0.33, random_state=123)
稍作改動(dòng)之后,我們使用 mars.remote.spawn
方法來(lái)讓 gen_data
調(diào)度到集群上運(yùn)行。
import mars.remote as mr# n_output 說(shuō)明是 4 輸出# execute() 執(zhí)行后,數(shù)據(jù)會(huì)讀取到 Mars 集群內(nèi)部data = mr.ExecutableTuple(mr.spawn(gen_data, n_output=4)).execute()# remote_ 開(kāi)頭的都是 Mars 對(duì)象,這時(shí)候數(shù)據(jù)在集群內(nèi),這些對(duì)象只是引用remote_X_train, remote_X_test, remote_y_train, remote_y_test = data
目前 Mars 能正確序列化 numpy ndarray、pandas DataFrame 等,還不能序列化模型,所以,我們要對(duì) train_and_metric
稍作改動(dòng),把模型 pickle 了之后再返回。
def distributed_train_and_metric(train_func, train_params: dict, X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame, y_test: pd.Series, verbose: bool = False ):model, metric = train_and_metric(train_func, train_params, X_train, y_train, X_test, y_test, verbose=verbose)return pickle.dumps(model), metric
后續(xù) Mars 支持了序列化模型后可以直接 spawn 原本的函數(shù)。
接著我們就對(duì)前面的執(zhí)行過(guò)程稍作改動(dòng),把函數(shù)調(diào)用全部都用 mars.remote.spawn
來(lái)改寫(xiě)。
import numpy as np tasks = [] models = [] metrics = []# -------------# Random Forest# -------------for params in gen_random_forest_parameters():# fixed random_stateparams['random_state'] = 123task = mr.spawn(distributed_train_and_metric, args=(random_forest, params, remote_X_train, remote_y_train, remote_X_test, remote_y_test), kwargs={'verbose': 2}, n_output=2) tasks.extend(task)# 把模型和評(píng)價(jià)分別存儲(chǔ)models.append(task[0]) metrics.append(task[1]) # -------------------# Logistic Regression# -------------------for params in gen_lr_parameters():# fixed random_stateparams['random_state'] = 123task = mr.spawn(distributed_train_and_metric, args=(logistic_regression, params, remote_X_train, remote_y_train, remote_X_test, remote_y_test), kwargs={'verbose': 2}, n_output=2) tasks.extend(task)# 把模型和評(píng)價(jià)分別存儲(chǔ)models.append(task[0]) metrics.append(task[1])# -------# XGBoost# -------for params in gen_xgb_parameters():# fixed random_stateparams['random_state'] = 123# 再指定并發(fā)為核的個(gè)數(shù)params['n_jobs'] = n_cores task = mr.spawn(distributed_train_and_metric, args=(xgb, params, remote_X_train, remote_y_train, remote_X_test, remote_y_test), kwargs={'verbose': 2}, n_output=2) tasks.extend(task)# 把模型和評(píng)價(jià)分別存儲(chǔ)models.append(task[0]) metrics.append(task[1])# 把順序打亂,目的是能分散到 worker 上平均一點(diǎn)shuffled_tasks = np.random.permutation(tasks) _ = mr.ExecutableTuple(shuffled_tasks).execute()
可以看到代碼幾乎一致。
運(yùn)行查看結(jié)果:
CPU times: user 69.1 ms, sys: 10.9 ms, total: 80 ms Wall time: 1min 59s
時(shí)間一下子從 31 分鐘多來(lái)到了 2 分鐘,提升 15x+。但代碼修改的代價(jià)可以忽略不計(jì)。
細(xì)心的讀者可能注意到了,分布式運(yùn)行的代碼中,我們把模型的 verbose 給打開(kāi)了,在分布式環(huán)境下,因?yàn)檫@些函數(shù)遠(yuǎn)程執(zhí)行,打印的內(nèi)容只會(huì)輸出到 worker 的標(biāo)準(zhǔn)輸出流,我們?cè)诳蛻?hù)端不會(huì)看到打印的結(jié)果,但 Mars 提供了一個(gè)非常有用的接口來(lái)讓我們查看每個(gè)模型運(yùn)行時(shí)的輸出。
以第0個(gè)模型為例,我們可以在 Mars 對(duì)象上直接調(diào)用 fetch_log
方法。
print(models[0].fetch_log())
輸出我們簡(jiǎn)略一部分。
building tree 1 of 50building tree 2 of 50building tree 3 of 50building tree 4 of 50building tree 5 of 50building tree 6 of 50# 中間省略building tree 49 of 50building tree 50 of 50
要看哪個(gè)模型都可以通過(guò)這種方式。試想下,如果沒(méi)有 fetch_log
API,你確想看中間過(guò)程的輸出有多麻煩。首先這個(gè)函數(shù)在哪個(gè) worker 上執(zhí)行,不得而知;然后,即便知道是哪個(gè) worker,因?yàn)槊總€(gè) worker 上可能有多個(gè)函數(shù)執(zhí)行,這些輸出就可能混雜在一起,甚至被龐大日志淹沒(méi)了。fetch_log
接口讓用戶(hù)不需要關(guān)心在哪個(gè) worker 上執(zhí)行,也不用擔(dān)心日志混合在一起。
想要了解 fetch_log
接口,可以查看 文檔。
“如何使用Mars Remote API執(zhí)行Python函數(shù)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
分享名稱(chēng):如何使用MarsRemoteAPI執(zhí)行Python函數(shù)
網(wǎng)頁(yè)地址:http://vcdvsql.cn/article34/jhjhse.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供動(dòng)態(tài)網(wǎng)站、定制網(wǎng)站、做網(wǎng)站、網(wǎng)站策劃、建站公司、虛擬主機(jī)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)