本篇文章為大家展示了利用HDFS怎么實(shí)現(xiàn)多文件Join操作,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。
鄞州網(wǎng)站制作公司哪家好,找成都創(chuàng)新互聯(lián)公司!從網(wǎng)頁設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、成都響應(yīng)式網(wǎng)站建設(shè)公司等網(wǎng)站項(xiàng)目制作,到程序開發(fā),運(yùn)營(yíng)維護(hù)。成都創(chuàng)新互聯(lián)公司于2013年成立到現(xiàn)在10年的時(shí)間,我們擁有了豐富的建站經(jīng)驗(yàn)和運(yùn)維經(jīng)驗(yàn),來保證我們的工作的順利進(jìn)行。專注于網(wǎng)站建設(shè)就選成都創(chuàng)新互聯(lián)公司。
詳解HDFS多文件Join操作的實(shí)例
最近在做HDFS文件處理之時(shí),遇到了多文件Join操作,其中包括:All Join以及常用的Left Join操作,
下面是個(gè)簡(jiǎn)單的例子;采用兩個(gè)表來做left join其中數(shù)據(jù)結(jié)構(gòu)如下:
A 文件:
a|1b|2|c
B文件:
a|b|1|2|c
即:A文件中的第一、二列與B文件中的第一、三列對(duì)應(yīng);類似數(shù)據(jù)庫中Table的主鍵/外鍵
代碼如下:
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase; import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase; import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import cn.eshore.traffic.hadoop.util.CommUtil; import cn.eshore.traffic.hadoop.util.StringUtil; /** * @ClassName: DataJoin * @Description: HDFS JOIN操作 * @author hadoop * @date 2012-12-18 下午5:51:32 */ public class InstallJoin extends Configured implements Tool { private String static enSplitCode = "\\|"; private String static splitCode = "|"; // 自定義Reducer public static class ReduceClass extends DataJoinReducerBase { @Override protected TaggedMapOutput combine(Object[] tags, Object[] values) { String joinedStr = ""; //該段判斷用戶生成Left join限制【其中tags表示文件的路徑,install表示文件名稱前綴】 //去掉則為All Join if (tags.length == 1 && tags[0].toString().contains("install")) { return null; } Map<String, String> map = new HashMap<String, String>(); for (int i = 0; i < values.length; i++) { TaggedWritable tw = (TaggedWritable) values[i]; String line = ((Text) tw.getData()).toString(); String[] tokens = line.split(enSplitCode, 8); String groupValue = tokens[6]; String type = tokens[7]; map.put(type, groupValue); } joinedStr += StringUtil.getCount(map.get("7"))+"|"+StringUtil.getCount(map.get("30")); TaggedWritable retv = new TaggedWritable(new Text(joinedStr)); retv.setTag((Text) tags[0]); return retv; } } // 自定義Mapper public static class MapClass extends DataJoinMapperBase { //自定義Key【類似數(shù)據(jù)庫中的主鍵/外鍵】 @Override protected Text generateGroupKey(TaggedMapOutput aRecord) { String line = ((Text) aRecord.getData()).toString(); String[] tokens = line.split(CommUtil.enSplitCode); String key = ""; String type = tokens[7]; //由于不同文件中的Key所在列有可能不同,所以需要?jiǎng)討B(tài)生成Key,其中type為不同文件中的數(shù)據(jù)標(biāo)識(shí);如:A文件最后一列為a用于表示此數(shù)據(jù)為A文件數(shù)據(jù) if ("7".equals(type)) { key = tokens[0]+"|"+tokens[1]; }else if ("30".equals(type)) { key = tokens[0]+"|"+tokens[2]; } return new Text(key); } @Override protected Text generateInputTag(String inputFile) { return new Text(inputFile); } @Override protected TaggedMapOutput generateTaggedMapOutput(Object value) { TaggedWritable retv = new TaggedWritable((Text) value); retv.setTag(this.inputTag); return retv; } } public static class TaggedWritable extends TaggedMapOutput { private Writable data; // 自定義 public TaggedWritable() { this.tag = new Text(""); } public TaggedWritable(Writable data) { this.tag = new Text(""); this.data = data; } @Override public Writable getData() { return data; } @Override public void write(DataOutput out) throws IOException { this.tag.write(out); out.writeUTF(this.data.getClass().getName()); this.data.write(out); } @Override public void readFields(DataInput in) throws IOException { this.tag.readFields(in); String dataClz = in.readUTF(); if (this.data == null || !this.data.getClass().getName().equals(dataClz)) { try { this.data = (Writable) ReflectionUtils.newInstance( Class.forName(dataClz), null); } catch (ClassNotFoundException e) { e.printStackTrace(); } } this.data.readFields(in); } } /** * job運(yùn)行 */ @Override public int run(String[] paths) throws Exception { int no = 0; try { Configuration conf = getConf(); JobConf job = new JobConf(conf, InstallJoin.class); FileInputFormat.setInputPaths(job, new Path(paths[0])); FileOutputFormat.setOutputPath(job, new Path(paths[1])); job.setJobName("join_data_test"); job.setMapperClass(MapClass.class); job.setReducerClass(ReduceClass.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TaggedWritable.class); job.set("mapred.textoutputformat.separator", CommUtil.splitCode); JobClient.runJob(job); no = 1; } catch (Exception e) { throw new Exception(); } return no; } //測(cè)試 public static void main(String[] args) { String[] paths = { "hdfs://master...:9000/home/hadoop/traffic/join/newtype", "hdfs://master...:9000/home/hadoop/traffic/join/newtype/output" } int res = 0; try { res = ToolRunner.run(new Configuration(), new InstallJoin(), paths); } catch (Exception e) { e.printStackTrace(); } System.exit(res); } }
上述內(nèi)容就是利用HDFS怎么實(shí)現(xiàn)多文件Join操作,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。
網(wǎng)站標(biāo)題:利用HDFS怎么實(shí)現(xiàn)多文件Join操作
鏈接URL:http://vcdvsql.cn/article30/iihdpo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供定制網(wǎng)站、Google、靜態(tài)網(wǎng)站、品牌網(wǎng)站設(shè)計(jì)、網(wǎng)站設(shè)計(jì)公司、網(wǎng)站設(shè)計(jì)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)