就用單詞計數這個(gè)例子,需要統計的單詞存在HBase中的word表,MapReduce執行的時(shí)候從word表讀取數據,統計結束后將結果寫(xiě)入到HBase的stat表中。
1、在eclipse中建立一個(gè)hadoop項目,然后從hbase的發(fā)布包中引入如下jar
hbase-0.94.13.jar zookeeper-3.4.5.jar protobuf-java-2.4.0a.jar guava-11.0.2.jar
2、在HBase中建立相關(guān)的表和初始化測試數據
package cn.luxh.app; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; public class InitData { public static void main(String[] args) throws IOException { //創(chuàng )建一個(gè)word表,只有一個(gè)列族content HBaseUtil.createTable("word","content"); //獲取word表 HTable htable = HBaseUtil.getHTable("word"); htable.setAutoFlush(false); //創(chuàng )建測試數據 List<Put> puts = new ArrayList<Put>(); Put put1 = HBaseUtil.getPut("1","content",null,"The Apache Hadoop software library is a framework"); Put put2 = HBaseUtil.getPut("2","content",null,"The common utilities that support the other Hadoop modules"); Put put3 = HBaseUtil.getPut("3","content",null,"Hadoop by reading the documentation"); Put put4 = HBaseUtil.getPut("4","content",null,"Hadoop from the release page"); Put put5 = HBaseUtil.getPut("5","content",null,"Hadoop on the mailing list"); puts.add(put1); puts.add(put2); puts.add(put3); puts.add(put4); puts.add(put5); //提交測試數據 htable.put(puts); htable.flushCommits(); htable.close(); //創(chuàng )建stat表,只有一個(gè)列祖result HBaseUtil.createTable("stat","result"); } }
1)代碼中的HBaseUtil工具類(lèi)參考:http://www.cnblogs.com/luxh/archive/2013/04/16/3025172.html
2)執行上面的程序后,查看HBase中是否已創(chuàng )建成功
hbase(main):012:0> list
TABLE
stat
word
2 row(s) in 0.4730 seconds
3)查看word中的測試數據
hbase(main):005:0> scan 'word'
ROW COLUMN+CELL
1 column=content:, timestamp=1385447676510, value=The Apache Hadoo
p software library is a framework
2 column=content:, timestamp=1385447676510, value=The common utili
ties that support the other Hadoop modules
3 column=content:, timestamp=1385447676510, value=Hadoop by readin
g the documentation
4 column=content:, timestamp=1385447676510, value=Hadoop from the
release page
5 column=content:, timestamp=1385447676510, value=Hadoop on the ma
iling list
5 row(s) in 5.7810 seconds
3、MapReduce程序
package cn.luxh.app; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; public class WordStat { /** * TableMapper<Text,IntWritable> Text:輸出的key類(lèi)型,IntWritable:輸出的value類(lèi)型 */ public static class MyMapper extends TableMapper<Text,IntWritable>{ private static IntWritable one = new IntWritable(1); private static Text word = new Text(); @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //表里面只有一個(gè)列族,所以我就直接獲取每一行的值 String words = Bytes.toString(value.list().get(0).getValue()); StringTokenizer st = new StringTokenizer(words); while (st.hasMoreTokens()) { String s = st.nextToken(); word.set(s); context.write(word, one); } } } /** * TableReducer<Text,IntWritable> Text:輸入的key類(lèi)型,IntWritable:輸入的value類(lèi)型,ImmutableBytesWritable:輸出類(lèi)型 */ public static class MyReducer extends TableReducer<Text,IntWritable,ImmutableBytesWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable val:values) { sum+=val.get(); } //添加一行記錄,每一個(gè)單詞作為行鍵 Put put = new Put(Bytes.toBytes(key.toString())); //在列族result中添加一個(gè)標識符num,賦值為每個(gè)單詞出現的次數 //String.valueOf(sum)先將數字轉化為字符串,否則存到數據庫后會(huì )變成x00x00x00x這種形式 //然后再轉二進(jìn)制存到hbase。 put.add(Bytes.toBytes("result"), Bytes.toBytes("num"), Bytes.toBytes(String.valueOf(sum))); context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())),put); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = HBaseConfiguration.create(); Job job = new Job(conf,"wordstat"); job.setJarByClass(Blog.class); Scan scan = new Scan(); //指定要查詢(xún)的列族 scan.addColumn(Bytes.toBytes("content"),null); //指定Mapper讀取的表為word TableMapReduceUtil.initTableMapperJob("word", scan, MyMapper.class, Text.class, IntWritable.class, job); //指定Reducer寫(xiě)入的表為stat TableMapReduceUtil.initTableReducerJob("stat", MyReducer.class, job); System.exit(job.waitForCompletion(true)?0:1); } }
等待程序執行結束,查看統計表stat
hbase(main):014:0> scan 'stat'
ROW COLUMN+CELL
Apache column=result:num, timestamp=1385449492309, value=1
Hadoop column=result:num, timestamp=1385449492309, value=5
The column=result:num, timestamp=1385449492309, value=2
a column=result:num, timestamp=1385449492309, value=1
by column=result:num, timestamp=1385449492309, value=1
common column=result:num, timestamp=1385449492309, value=1
documentation column=result:num, timestamp=1385449492309, value=1
framework column=result:num, timestamp=1385449492309, value=1
from column=result:num, timestamp=1385449492309, value=1
is column=result:num, timestamp=1385449492309, value=1
library column=result:num, timestamp=1385449492309, value=1
list column=result:num, timestamp=1385449492309, value=1
mailing column=result:num, timestamp=1385449492309, value=1
modules column=result:num, timestamp=1385449492309, value=1
on column=result:num, timestamp=1385449492309, value=1
other column=result:num, timestamp=1385449492309, value=1
page column=result:num, timestamp=1385449492309, value=1
reading column=result:num, timestamp=1385449492309, value=1
release column=result:num, timestamp=1385449492309, value=1
software column=result:num, timestamp=1385449492309, value=1
support column=result:num, timestamp=1385449492309, value=1
that column=result:num, timestamp=1385449492309, value=1
the column=result:num, timestamp=1385449492309, value=4
utilities column=result:num, timestamp=1385449492309, value=1
24 row(s) in 0.7970 seconds