MapReduce是一种常用的分布式计算模型,通常用于大规模数据处理任务,在MapReduce中,序列化是一个至关重要的概念,因为它将数据转换为字节流,以便在网络中进行传输和存储,本文将详细介绍MapReduce中的序列化机制,包括其、Writable类、常用数据序列化类型以及自定义bean对象实现序列化的步骤。
一、序列化
序列化是将数据结构转换为字节流的过程,通常用于数据在网络中传输和存储,在MapReduce中,序列化尤为重要,因为MapReduce需要将数据分发到多个节点上进行并行计算,在MapReduce中,数据通常以键值对的形式存在,每个键值对都需要进行序列化。
二、Writable类
Writable类是MapReduce中用于序列化和反序列化数据的抽象类,它定义了两个方法:write和readFields,用户可以通过继承Writable来实现自定义数据类型的序列化和反序列化。
write方法
write方法用于将Writable对象转换为字节流,通常实现为将每个字段按照特定的格式写入到输出流中,write方法的实现应遵循以下规则:
写入的数据应该是有序的,并且写入的顺序应该与字段定义的顺序相同。
写入的数据应该是固定长度的,这样可以方便地进行反序列化。
写入的数据应该是可重复的,这样可以方便地进行分布式计算。
一个简单的Writable类的例子如下:
public class MyWritable implements Writable { private int field1; private String field2; @Override public void write(DataOutput out) throws IOException { out.writeInt(field1); out.writeUTF(field2); } @Override public void readFields(DataInput in) throws IOException { field1 = in.readInt(); field2 = in.readUTF(); } }
在这个例子中,MyWritable类有两个字段:field1和field2,write方法将field1和field2按照固定的顺序写入到输出流中,readFields方法从输入流中读取field1和field2的值。
readFields方法
readFields方法用于将字节流转换为Writable对象,通常实现为从输入流中读取每个字段的值,并将其设置到Writable对象的相应字段中,readFields方法的实现应遵循以下规则:
读取的数据应该是有序的,并且读取的顺序应该与字段定义的顺序相同。
读取的数据应该是固定长度的,这样可以方便地进行反序列化。
下面是一个读取自定义Writable对象的例子:
MyWritable obj = new MyWritable(); obj.readFields(in);
在这个例子中,我们创建了一个MyWritable对象,并调用了readFields方法将输入流中的数据读取到MyWritable对象中。
三、常用数据序列化类型
在MapReduce中,常用的序列化类型有int与IntWritable转化、Text与String序列化等,这些序列化类型通过简单的校验使得存储空间少、传输速度快。
int与IntWritable转化
IntWritable是一个专门用于序列化整数的类,下面是int与IntWritable之间的转换示例:
// b是int类型 IntWritable outV = new IntWritable(); outV.set(b); // a是IntWritable类型 int b = outV.get();
Text与String序列化
Text是Hadoop提供的用于序列化字符串的类,下面是Text与String之间的转换示例:
// Text --> String Text text = new Text(); String s = text.toString(); // String --> Text Text.set(string);
四、自定义bean对象实现序列化接口(Writable)
尽管Hadoop提供了一些常用的数据序列化类型,但它们并不能满足所有的需求,对于复杂的bean对象,我们需要实现Writable接口来进行序列化,自定义bean对象实现序列化的步骤如下:
1、实现Writable接口。
2、重写序列化方法write。
3、重写反序列化方法readFields。
4、反序列化时,需要反射调用空参构造函数,所以必须有空参构造器。
5、序列化的顺序和反序列化的顺序一致。
6、要想把结果显示在文件中,需要重写toString()方法,默认传输过来的是地址值,可用’t’分开,方便后续使用。
7、如果自定义的bean放在key中传输,还需要实现Comparable接口,因为MapReduce框架中的Shuffle过程要求对key必须能排序。
一个自定义bean对象的例子如下:
public class FlowBean implements Writable { private long upFlow; //上行流量 private long downFlow; //下行流量 private long sumFlow; //总流量 public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } //重载计算总流量函数,因为不会传总流量,只会通过上行流量与下行流量计算得出 public void setSumFlow() { this.sumFlow = this.upFlow + this.downFlow; } //3.重写空参构造 public FlowBean() { } //4.重写toString方法 @Override public String toString() { //输出会调用此对象的此方法,所以按输出的格式来写 return upFlow + "t" + downFlow + "t" + sumFlow; } //2.重写序列化方法 @Override public void write(DataOutput dataOutput) throws IOException { //这里的数据都是Long类型,所以使用writeLong dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } //5.重写反序列化方法 @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } }
这个FlowBean类实现了Writable接口,并重写了序列化和反序列化方法,它还重写了toString方法以便在输出时显示结果。
五、序列化案例实操
下面我们通过一个实际案例来演示如何在MapReduce中使用序列化,假设我们需要统计每个手机号的总上行流量、总下行流量和总流量,输入数据格式如下:
id 手机号 网络ip 域名 上行流量 下行流量 网络状态码
期望输出数据格式如下:
手机号 总上行流量 总下行流量 总流量
需求分析
Map阶段:输入的key是这一行的偏移量,输入的value是这一行的数据,输出的key是手机号,输出的value是一个包含上行流量、下行流量和总流量的bean对象。
Reduce阶段:累加上行流量和下行流量得到总流量。
编写MapReduce程序
FlowBean类
我们创建一个FlowBean类,该类实现了Writable接口,并包含了上行流量、下行流量和总流量的属性,代码如下:
public class FlowBean implements Writable { private long upFlow; //上行流量 private long downFlow; //下行流量 private long sumFlow; //总流量 public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } //重载计算总流量函数,因为不会传总流量,只会通过上行流量与下行流量计算得出 public void setSumFlow() { this.sumFlow = this.upFlow + this.downFlow; } //3.重写空参构造 public FlowBean() { } //4.重写toString方法 @Override public String toString() { //输出会调用此对象的此方法,所以按输出的格式来写 return upFlow + "t" + downFlow + "t" + sumFlow; } //2.重写序列化方法 @Override public void write(DataOutput dataOutput) throws IOException { //这里的数据都是Long类型,所以使用writeLong dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } //5.重写反序列化方法 @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } }
编写Mapper类和Reducer类
我们编写Mapper类和Reducer类,Mapper类的代码如下:
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("t"); String phoneNum = fields[1]; //提取手机号作为key long upFlow = Long.parseLong(fields[4]); //提取上行流量 long downFlow = Long.parseLong(fields[5]); //提取下行流量 FlowBean flowBean = new FlowBean(); //创建FlowBean对象并设置值 flowBean.setUpFlow(upFlow); flowBean.setDownFlow(downFlow); flowBean.setSumFlow(); //计算总流量并设置值 context.write(new Text(phoneNum), flowBean); //输出key-value对 } }
Reducer类的代码如下:
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> { @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long upFlowSum = 0; //初始化上行流量总和为0 long downFlowSum = 0; //初始化下行流量总和为0 for (FlowBean bean : values) { //遍历所有值并累加流量 upFlowSum += bean.getUpFlow(); //累加上行流量 downFlowSum += bean.getDownFlow(); //累加下行流量 } FlowBean result = new FlowBean(); //创建结果对象并设置值 result.setUpFlow(upFlowSum); //设置上行流量总和 result.setDownFlow(downFlowSum); //设置下行流量总和 result.setSumFlow(); //计算总流量并设置值 context.write(key, result); //输出结果key-value对 } }
编写Driver类并运行程序
我们编写Driver类并运行程序,Driver类的代码如下:
public class FlowDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //配置作业基本信息 Job job = Job.getInstance(conf, "flow"); //获取作业实例并设置作业名称为flow job.setJarByClass(FlowDriver.class); //设置主类为当前类 job.setMapperClass(FlowMapper.class); //设置Mapper类为FlowMapper类 job.setCombinerClass(FlowReducer.class); //设置Combiner类为FlowReducer类(可选) job.setReducerClass(FlowReducer.class); //设置Reducer类为FlowReducer类 job.setOutputKeyClass(Text.class); //设置输出key的类型为Text类型 job.setOutputValueClass(FlowBean.class); //设置输出value的类型为FlowBean类型 job.setPartitionerClass(HashPartitioner.class); //设置分区器为HashPartitioner类(可选) job.setNumReduceTasks(1); //设置reduce任务数量为1个(可选) FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/input")); //设置输入路径为hdfs://localhost:9000/input目录中的文件(根据实际情况修改) FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output")); //设置输出路径为hdfs://localhost:9000/output目录中的文件(根据实际情况修改) System.exit(job.waitForCompletion(true) ? 0 : 1); //等待作业完成并退出程序(返回值为0表示成功,非0表示失败)
到此,以上就是小编对于“mapreduce中的序列化_数据序列化”的问题就介绍到这了,希望介绍的几点解答对大家有用,有任何问题和不懂的,欢迎各位朋友在评论区讨论,给我留言。
最新评论
本站CDN与莫名CDN同款、亚太CDN、速度还不错,值得推荐。
感谢推荐我们公司产品、有什么活动会第一时间公布!
我在用这类站群服务器、还可以. 用很多年了。