Hadoop鍩烘湰娴佺▼


涓涓浘鐗囧お澶т簡锛屽彧濂藉垎鍓叉垚涓轰袱閮ㄥ垎銆傛牴鎹祦绋嬪浘鏉ヨ涓涓嬪叿浣撲竴涓换鍔℃墽琛岀殑鎯呭喌銆
- 鍦ㄥ垎甯冨紡鐜涓鎴风鍒涘缓浠诲姟骞舵彁浜ゃ
- InputFormat鍋歁ap鍓嶇殑棰勫鐞嗭紝涓昏璐熻矗浠ヤ笅宸ヤ綔锛
- 楠岃瘉杈撳叆鐨勬牸寮忔槸鍚︾鍚圝obConfig鐨勮緭鍏ュ畾涔夛紝杩欎釜鍦ㄥ疄鐜癕ap鍜屾瀯寤篊onf鐨勬椂鍊欏氨浼氱煡閬擄紝涓嶅畾涔夊彲浠ユ槸Writable鐨勪换鎰忓瓙绫汇
- 灏唅nput鐨勬枃浠跺垏鍒嗕负閫昏緫涓婄殑杈撳叆InputSplit锛屽叾瀹炶繖灏辨槸鍦ㄤ笂闈㈡彁鍒扮殑鍦ㄥ垎甯冨紡鏂囦欢绯荤粺涓璪locksize鏄湁澶у皬闄愬埗鐨勶紝鍥犳澶ф枃浠朵細琚垝鍒嗕负澶氫釜block銆
- 閫氳繃RecordReader鏉ュ啀娆″鐞唅nputsplit涓轰竴缁剅ecords锛岃緭鍑虹粰Map銆傦紙inputsplit鍙槸閫昏緫鍒囧垎鐨勭涓姝ワ紝浣嗘槸濡備綍鏍规嵁鏂囦欢涓殑淇℃伅鏉ュ垏鍒嗚繕闇瑕丷ecordReader鏉ュ疄鐜帮紝渚嬪鏈绠鍗曠殑榛樿鏂瑰紡灏辨槸鍥炶溅鎹㈣鐨勫垏鍒嗭級
- RecordReader澶勭悊鍚庣殑缁撴灉浣滀负Map鐨勮緭鍏ワ紝Map鎵ц瀹氫箟鐨凪ap閫昏緫锛岃緭鍑哄鐞嗗悗鐨刱ey鍜寁alue瀵瑰簲鍒颁复鏃朵腑闂存枃浠躲
- Combiner鍙夋嫨閰嶇疆锛屼富瑕佷綔鐢ㄦ槸鍦ㄦ瘡涓涓狹ap鎵ц瀹屽垎鏋愪互鍚庯紝鍦ㄦ湰鍦颁紭鍏堜綔Reduce鐨勫伐浣滐紝鍑忓皯鍦≧educe杩囩▼涓殑鏁版嵁浼犺緭閲忋
- Partitioner鍙夋嫨閰嶇疆锛屼富瑕佷綔鐢ㄦ槸鍦ㄥ涓猂educe鐨勬儏鍐典笅锛屾寚瀹歁ap鐨勭粨鏋滅敱鏌愪竴涓猂educe澶勭悊锛屾瘡涓涓猂educe閮戒細鏈夊崟鐙殑杈撳嚭鏂囦欢銆傦紙鍚庨潰鐨勪唬鐮佸疄渚嬩腑鏈変粙缁嶄娇鐢ㄥ満鏅級
- Reduce鎵ц鍏蜂綋鐨勪笟鍔¢昏緫锛屽苟涓斿皢澶勭悊缁撴灉杈撳嚭缁橭utputFormat銆
- OutputFormat鐨勮亴璐f槸锛岄獙璇佽緭鍑虹洰褰曟槸鍚﹀凡缁忓瓨鍦紝鍚屾椂楠岃瘉杈撳嚭缁撴灉绫诲瀷鏄惁濡侰onfig涓厤缃紝鏈鍚庤緭鍑篟educe姹囨诲悗鐨勭粨鏋溿
涓氬姟鍦烘櫙鍜屼唬鐮佽寖渚
涓氬姟鍦烘櫙鎻忚堪锛鍙瀹氳緭鍏ュ拰杈撳嚭璺緞锛堟搷浣滅郴缁熺殑璺緞闈濰DFS璺緞锛夛紝鏍规嵁璁块棶鏃ュ織鍒嗘瀽鏌愪竴涓簲鐢ㄨ闂煇涓涓狝PI鐨勬绘鏁板拰鎬绘祦閲忥紝缁熻鍚庡垎鍒緭鍑哄埌涓や釜鏂囦欢涓傝繖閲屼粎浠呬负浜嗘祴璇曪紝娌℃湁鍘荤粏鍒嗗緢澶氱被锛屽皢鎵鏈夌殑绫婚兘褰掑苟浜庝竴涓被渚夸簬璇存槑闂銆

娴嬭瘯浠g爜绫诲浘
LogAnalysiser灏辨槸涓荤被锛屼富瑕佽礋璐e垱寤恒佹彁浜や换鍔★紝骞朵笖杈撳嚭閮ㄥ垎淇℃伅銆傚唴閮ㄧ殑鍑犱釜瀛愮被鐢ㄩ斿彲浠ュ弬鐪嬫祦绋嬩腑鎻愬埌鐨勮鑹茶亴璐c傚叿浣撳湴鐪嬬湅鍑犱釜绫诲拰鏂规硶鐨勪唬鐮佺墖鏂細
LogAnalysiser::MapClass
聽聽聽 public static class MapClass extends MapReduceBase
聽聽聽 聽聽聽 implements Mapper<LongWritable, Text, Text, LongWritable>
聽聽聽 {
聽聽聽 聽聽聽 public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter)
聽聽聽 聽聽聽 聽聽聽 聽聽聽 throws IOException
聽聽聽 聽聽聽 {聽聽聽
聽聽聽 聽聽聽 聽聽聽 String line = value.toString();//娌℃湁閰嶇疆RecordReader锛屾墍浠ラ粯璁ら噰鐢╨ine鐨勫疄鐜帮紝key灏辨槸琛屽彿锛寁alue灏辨槸琛屽唴瀹
聽聽聽 聽聽聽 聽聽聽 if (line == null || line.equals(""))
聽聽聽 聽聽聽 聽聽聽 聽聽聽 return;
聽聽聽 聽聽聽 聽聽聽 String[] words = line.split(",");
聽聽聽 聽聽聽 聽聽聽 if (words == null || words.length <
聽聽聽 聽聽聽 聽聽聽 聽聽聽 return;
聽聽聽 聽聽聽 聽聽聽 String appid = words[1];
聽聽聽 聽聽聽 聽聽聽 String apiName = words[2];
聽聽聽 聽聽聽 聽聽聽 LongWritable recbytes = new LongWritable(Long.parseLong(words[7]));
聽聽聽 聽聽聽 聽聽聽 Text record = new Text();
聽聽聽 聽聽聽 聽聽聽 record.set(new StringBuffer("flow::").append(appid)
聽聽聽 聽聽聽 聽聽聽 聽聽聽 聽聽聽 聽聽聽 聽聽聽 .append("::").append(apiName).toString());
聽聽聽 聽聽聽 聽聽聽 reporter.progress();
聽聽聽 聽聽聽 聽聽聽 output.collect(record, recbytes);//杈撳嚭娴侀噺鐨勭粺璁$粨鏋滐紝閫氳繃flow::浣滀负鍓嶇紑鏉ユ爣绀恒
聽聽聽 聽聽聽 聽聽聽 record.clear();
聽聽聽 聽聽聽 聽聽聽 record.set(new StringBuffer("count::").append(appid).append("::").append(apiName).toString());
聽聽聽 聽聽聽 聽聽聽 output.collect(record, new LongWritable(1));//杈撳嚭娆℃暟鐨勭粺璁$粨鏋滐紝閫氳繃count::浣滀负鍓嶇紑鏉ユ爣绀
聽聽聽 聽聽聽 }聽聽聽
聽聽聽 }
LogAnalysiser:: PartitionerClass
聽聽聽 public static class PartitionerClass implements Partitioner<Text, LongWritable>
聽聽聽 {
聽聽聽 聽聽聽 public int getPartition(Text key, LongWritable value, int numPartitions)
聽聽聽 聽聽聽 {
聽聽聽 聽聽聽 聽聽聽 if (numPartitions >= 2)//Reduce 涓暟锛屽垽鏂祦閲忚繕鏄鏁扮殑缁熻鍒嗛厤鍒颁笉鍚岀殑Reduce
聽聽聽 聽聽聽 聽聽聽 聽聽聽 if (key.toString().startsWith("flow::"))
聽聽聽 聽聽聽 聽聽聽 聽聽聽 聽聽聽 return 0;
聽聽聽 聽聽聽 聽聽聽 聽聽聽 else
聽聽聽 聽聽聽 聽聽聽 聽聽聽 聽聽聽 return 1;
聽聽聽 聽聽聽 聽聽聽 else
聽聽聽 聽聽聽 聽聽聽 聽聽聽 return 0;
聽聽聽 聽聽聽 }
聽聽聽 聽聽聽 public void configure(JobConf job){}聽聽聽
}
LogAnalysiser:: CombinerClass
鍙傜湅ReduceClass锛岄氬父涓よ呭彲浠ヤ娇鐢ㄤ竴涓紝涓嶈繃杩欓噷鏈変簺涓嶅悓鐨勫鐞嗗氨鍒嗘垚浜嗕袱涓傚湪ReduceClass涓摑鑹茬殑琛岃〃绀哄湪CombinerClass涓笉瀛樺湪銆
LogAnalysiser:: ReduceClass
聽聽聽 public static class ReduceClass extends MapReduceBase
聽聽聽 聽聽聽 implements Reducer<Text, LongWritable,Text, LongWritable>
聽聽聽 {
聽聽聽 聽聽聽 public void reduce(Text key, Iterator<LongWritable> values,
聽聽聽 聽聽聽 聽聽聽 聽聽聽 OutputCollector<Text, LongWritable> output, Reporter reporter)throws IOException
聽聽聽 聽聽聽 {
聽聽聽 聽聽聽 聽聽聽 Text newkey = new Text();
聽聽聽 聽聽聽 聽聽聽 newkey.set(key.toString().substring(key.toString().indexOf("::")+2));
聽聽聽 聽聽聽 聽聽聽 LongWritable result = new LongWritable();
聽聽聽 聽聽聽 聽聽聽 long tmp = 0;
聽聽聽 聽聽聽 聽聽聽 int counter = 0;
聽聽聽 聽聽聽 聽聽聽 while(values.hasNext())//绱姞鍚屼竴涓猭ey鐨勭粺璁$粨鏋
聽聽聽 聽聽聽 聽聽聽 {
聽聽聽 聽聽聽 聽聽聽 聽聽聽 tmp = tmp + values.next().get();
聽聽聽 聽聽聽 聽聽聽 聽聽聽
聽聽聽 聽聽聽 聽聽聽 聽聽聽 counter = counter +1;//鎷呭績澶勭悊澶箙锛孞obTracker闀挎椂闂存病鏈夋敹鍒版姤鍛婁細璁や负TaskTracker宸茬粡澶辨晥锛屽洜姝ゅ畾鏃舵姤鍛婁竴涓
聽聽聽 聽聽聽 聽聽聽 聽聽聽 if (counter == 1000)
聽聽聽 聽聽聽 聽聽聽 聽聽聽 {
聽聽聽 聽聽聽 聽聽聽 聽聽聽 聽聽聽 counter = 0;
聽聽聽 聽聽聽 聽聽聽 聽聽聽 聽聽聽 reporter.progress();
聽聽聽 聽聽聽 聽聽聽 聽聽聽 }
聽聽聽 聽聽聽 聽聽聽 }
聽聽聽 聽聽聽 聽聽聽 result.set(tmp);
聽聽聽 聽聽聽 聽聽聽 output.collect(newkey, result);//杈撳嚭鏈鍚庣殑姹囨荤粨鏋
聽聽聽 聽聽聽 }聽聽聽
聽聽聽 }
LogAnalysiser
public static void main(String[] args)
{
try
{
run(args);
} catch (Exception e)
{
e.printStackTrace();
}
}
public static void run(String[] args) throws Exception
{
if (args == null || args.length <2)
{
System.out.println("need inputpath and outputpath");
return;
}
String inputpath = args[0];
String outputpath = args[1];
String shortin = args[0];
String shortout = args[1];
if (shortin.indexOf(File.separator) >= 0)
shortin = shortin.substring(shortin.lastIndexOf(File.separator));
if (shortout.indexOf(File.separator) >= 0)
shortout = shortout.substring(shortout.lastIndexOf(File.separator));
SimpleDateFormat formater = new SimpleDateFormat("yyyy.MM.dd");
shortout = new StringBuffer(shortout).append("-")
.append(formater.format(new Date())).toString();
if (!shortin.startsWith("/"))
shortin = "/" + shortin;
if (!shortout.startsWith("/"))
shortout = "/" + shortout;
shortin = "/user/root" + shortin;
shortout = "/user/root" + shortout;
File inputdir = new File(inputpath);
File outputdir = new File(outputpath);
if (!inputdir.exists() || !inputdir.isDirectory())
{
System.out.println("inputpath not exist or isn't dir!");
return;
}
if (!outputdir.exists())
{
new File(outputpath).mkdirs();
}
JobConf conf = new JobConf(new Configuration(),LogAnalysiser.class);//鏋勫缓Config
FileSystem fileSys = FileSystem.get(conf);
fileSys.copyFromLocalFile(new Path(inputpath), new Path(shortin));//灏嗘湰鍦版枃浠剁郴缁熺殑鏂囦欢鎷疯礉鍒癏DFS涓
conf.setJobName("analysisjob");
conf.setOutputKeyClass(Text.class);//杈撳嚭鐨刱ey绫诲瀷锛屽湪OutputFormat浼氭鏌
conf.setOutputValueClass(LongWritable.class); //杈撳嚭鐨剉alue绫诲瀷锛屽湪OutputFormat浼氭鏌
conf.setMapperClass(MapClass.class);
conf.setCombinerClass(CombinerClass.class);
conf.setReducerClass(ReduceClass.class);
conf.setPartitionerClass(PartitionerClass.class);
conf.set("mapred.reduce.tasks", "2");//寮哄埗闇瑕佹湁涓や釜Reduce鏉ュ垎鍒鐞嗘祦閲忓拰娆℃暟鐨勭粺璁
FileInputFormat.setInputPaths(conf, shortin);//hdfs涓殑杈撳叆璺緞
FileOutputFormat.setOutputPath(conf, new Path(shortout));//hdfs涓緭鍑鸿矾寰
Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(conf);
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
//鍒犻櫎杈撳叆鍜岃緭鍑虹殑涓存椂鏂囦欢
fileSys.copyToLocalFile(new Path(shortout),new Path(outputpath));
fileSys.delete(new Path(shortin),true);
fileSys.delete(new Path(shortout),true);
}
浠ヤ笂鐨勪唬鐮佸氨瀹屾垚浜嗘墍鏈夌殑閫昏緫鎬т唬鐮侊紝鐒跺悗杩橀渶瑕佷竴涓敞鍐岄┍鍔ㄧ被鏉ユ敞鍐屼笟鍔lass涓轰竴涓彲鏍囩ず鐨勫懡浠わ紝璁﹉adoop jar鍙互鎵ц銆
public class ExampleDriver {
聽 public static void main(String argv[]){
聽聽聽 ProgramDriver pgd = new ProgramDriver();
聽聽聽 try {
聽聽聽聽聽 pgd.addClass("analysislog", LogAnalysiser.class, "A map/reduce program that analysis log .");
聽聽聽聽聽 pgd.driver(argv);
聽聽聽 }
聽聽聽 catch(Throwable e){
聽聽聽聽聽 e.printStackTrace();
聽聽聽 }
聽 }
}
灏嗕唬鐮佹墦鎴恓ar锛屽苟涓旇缃甹ar鐨刴ainClass涓篍xampleDriver杩欎釜绫汇傚湪鍒嗗竷寮忕幆澧冨惎鍔ㄤ互鍚庢墽琛屽涓嬭鍙ワ細
hadoop jar analysiser.jar analysislog /home/wenchu/test-in /home/wenchu/test-out
鍦/home/wenchu/test-in涓槸闇瑕佸垎鏋愮殑鏃ュ織鏂囦欢锛屾墽琛屽悗灏变細鐪嬭鏁翠釜鎵ц杩囩▼锛屽寘鎷簡Map鍜孯educe鐨勮繘搴︺傛墽琛屽畬姣曚細鍦/home/wenchu/test-out涓嬬湅鍒拌緭鍑虹殑鍐呭銆傛湁涓や釜鏂囦欢锛歱art-00000鍜宲art-00001鍒嗗埆璁板綍浜嗙粺璁″悗鐨勭粨鏋溿 濡傛灉闇瑕佺湅鎵ц鐨勫叿浣撴儏鍐碉紝鍙互鐪嬪湪杈撳嚭鐩綍涓嬬殑_logs/history/xxxx_analysisjob锛岄噷闈㈢綏鍒椾簡鎵鏈夌殑Map锛孯educe鐨勫垱寤烘儏鍐典互鍙婃墽琛屾儏鍐点傚湪杩愯鏈熶篃鍙互閫氳繃娴忚鍣ㄦ潵鏌ョ湅Map,Reduce鐨勬儏鍐碉細http://MasterIP:50030/jobtracker.jsp
Hadoop闆嗙兢娴嬭瘯
棣栧厛杩欓噷浣跨敤涓婇潰鐨勮寖渚嬩綔涓烘祴璇曪紝涔熸病鏈夊仛澶鐨勪紭鍖栭厤缃紝杩欎釜娴嬭瘯缁撴灉鍙槸涓轰簡鐪嬬湅闆嗙兢鐨勬晥鏋滐紝浠ュ強涓浜涘弬鏁伴厤缃殑褰卞搷銆
鏂囦欢澶嶅埗鏁颁负1锛宐locksize 5M
| Slave鏁 | 澶勭悊璁板綍鏁(涓囨潯) | 鎵ц鏃堕棿锛堢锛 |
| 2 | 95 | 38 |
| 2 | 950 | 337 |
| 4 | 95 | 24 |
| 4 | 950 | 178 |
| 6 | 95 | 21 |
| 6 | 950 | 114 |
聽
Blocksize 5M
| Slave鏁 | 澶勭悊璁板綍鏁(涓囨潯) | 鎵ц鏃堕棿锛堢锛 |
| 2锛堟枃浠跺鍒舵暟涓1锛 | 950 | 337 |
| 2锛堟枃浠跺鍒舵暟涓3锛 | 950 | 339 |
| 6锛堟枃浠跺鍒舵暟涓1锛 | 950 | 114 |
| 6锛堟枃浠跺鍒舵暟涓3锛 | 950 | 117 |
聽
鏂囦欢澶嶅埗鏁颁负1
| Slave鏁 | 澶勭悊璁板綍鏁(涓囨潯) | 鎵ц鏃堕棿锛堢锛 |
| 6(blocksize 5M) | 95 | 21 |
| 6(blocksize 77M) | 95 | 26 |
| 4(blocksize 5M) | 950 | 178 |
| 4(blocksize 50M) | 950 | 54 |
| 6(blocksize 5M) | 950 | 114 |
| 6(blocksize 50M) | 950 | 44 |
| 6(blocksize 77M) | 950 | 74 |
聽
娴嬭瘯鐨勬暟鎹粨鏋滃緢绋冲畾锛屽熀鏈祴鍑犳鍚屾牱鏉′欢涓嬮兘鏄竴鏍枫傞氳繃娴嬭瘯缁撴灉鍙互鐪嬪嚭浠ヤ笅鍑犵偣锛
- 鏈哄櫒鏁板浜庢ц兘杩樻槸鏈夊府鍔╃殑锛堢瓑浜庢病璇碸_^锛夈
- 鏂囦欢澶嶅埗鏁扮殑澧炲姞鍙瀹夊叏鎬ф湁甯姪锛屼絾鏄浜庢ц兘娌℃湁澶甯姪銆傝屼笖鐜板湪閲囧彇鐨勬槸灏嗘搷浣滅郴缁熸枃浠舵嫹璐濆埌HDFS涓紝鎵浠ュ浠藉浜嗭紝鍑嗗鐨勬椂闂村緢闀裤
- blocksize瀵逛簬鎬ц兘褰卞搷寰堝ぇ锛岄鍏堝鏋滃皢block鍒掑垎鐨勫お灏忥紝閭d箞灏嗕細澧炲姞job鐨勬暟閲忥紝鍚屾椂涔熷鍔犱簡鍗忎綔鐨勪唬浠凤紝闄嶄綆浜嗘ц兘锛屼絾鏄厤缃殑澶ぇ涔熶細璁﹋ob涓嶈兘鏈澶у寲骞惰澶勭悊銆傛墍浠ヨ繖涓肩殑閰嶇疆闇瑕佹牴鎹暟鎹鐞嗙殑閲忔潵鑰冭檻銆
- 鏈鍚庡氨鏄櫎浜嗚繖涓〃閲岄潰鍒楀嚭鏉ョ殑缁撴灉锛屽簲璇ュ幓浠旂粏鐪嬭緭鍑虹洰褰曚腑鐨刜logs/history涓殑xxx_analysisjob杩欎釜鏂囦欢锛岄噷闈㈣褰曚簡鍏ㄩ儴鐨勬墽琛岃繃绋嬩互鍙婅鍐欐儏鍐点傝繖涓彲浠ユ洿鍔犳竻妤氬湴浜嗚В鍝噷鍙兘浼氭洿鍔犺楁椂銆
闅忔兂
鈥滀簯璁$畻鈥濈儹鐨勭儷鎵嬶紝灏卞拰SAAS銆乄eb2鍙奡NS绛変竴鏍凤紝寰寰閮芥槸鍦ㄦ悶姒傚康锛屽彧鏈夌湡姝h笍韪忓疄瀹炵殑澶у瀷浜掕仈缃戝叕鍙革紝鎵嶄細鎶曞叆浜哄姏鐗╁姏鍘荤爺绌剁鍚堣嚜宸辩殑鍒嗗竷寮忚绠椼傚叾瀹炲綋浣犵殑鏁版嵁閲忔病鏈夐偅涔堝ぇ鐨勬椂鍊欙紝杩欑鍒嗗竷寮忚绠椾篃灏变粎浠呭彧鏄竴涓帺鍏疯屽凡锛屽彧鏈夊湪鐪熸瑙e喅闂鐨勮繃绋嬩腑锛屽畠娣卞眰娆$殑闂鎵嶄細琚寲鎺樺嚭鏉ャ
杩欎笁绡囨枃绔狅紙鍒嗗竷寮忚绠楀紑婧愭鏋禜adoop浠嬬粛锛孒adoop涓殑闆嗙兢閰嶇疆鍜屼娇鐢ㄦ妧宸э級浠呬粎鏄负浜嗙粰瀵瑰垎甯冨紡璁$畻鏈夊叴瓒g殑鏈嬪弸鎶涗釜鐮栵紝瑕佹兂鐪熺殑鎺樺埌閲戝瓙锛岄偅涔堝氨韪忚笍瀹炲疄鐨勫幓鐢ㄣ佸幓鎯炽佸幓鍒嗘瀽銆傛垨鑰呰嚜宸变篃浼氭洿杩涗竴姝ュ湴鍘荤爺绌舵鏋朵腑鐨勫疄鐜版満鍒讹紝鍦ㄨВ鍐宠嚜宸遍棶棰樼殑鍚屾椂锛屼篃鑳藉璐$尞涓浜涗粈涔堛
鍓嶅嚑鏃ョ湅鍒版湁浜鸿藩姹傛垚涓烘灦鏋勫笀鐨勬柟寮忥紝鐪嬩簡鏈変簺鍙偛锛屾湁浜涘彲绗戯紝鍏跺疄鏈夊灏戞灦鏋勫笀鐭ラ亾浠涔堝彨鍋氭灦鏋勶紵鏋舵瀯甯堢殑鑱岃矗鏄粈涔堬紵涓庡叾杩芥眰杩欎箞涓涓悕鍙凤紝杩樹笉濡傝笍韪忓疄瀹炲湴鍋氬潡鐭冲ご娌夊埌姘村簳銆傝鐭ラ亾锛岀Н绱拰娌夋穩鐨勮繃绋嬪氨鏄竴绉嶆垚闀裤



