博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MapReduce高级编程——自定义InputFormat
阅读量:2395 次
发布时间:2019-05-10

本文共 6922 字,大约阅读时间需要 23 分钟。

0、测试集样例

ball, 3.5, 12.7, 9.0car, 15, 23.76, 42.23device, 0.0, 12.4, -67.1
 

1、测试Point3D InputFormat

import java.io.IOException;import java.net.URI;import javax.xml.soap.Text;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** * desc:Custom Data Types TestPoint3DInputFormat *  * @author chenwq */public class TestPoint3DInputFormat {	 /**     * @param args     * @throws IOException      * @throws ClassNotFoundException      * @throws InterruptedException      */    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {        // TODO Auto-generated method stub        System.out.println("hello,chenwq!");        Job job=new Job();        Configuration conf=new Configuration();        FileSystem fs=FileSystem.get(URI.create(args[1]), conf);        fs.delete(new Path(args[1]));        job.setJobName("测试MyInputFormat程序。。。。。");        FileInputFormat.addInputPath(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, new Path(args[1]));        job.setInputFormatClass(Point3DinputFormat.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(Point3D.class);        job.setMapperClass(Point3DMapper.class);        job.setNumReduceTasks(0);        job.waitForCompletion(false);    }}

 

2、自定义类型Point3D必须实现WritableComparable接口,才能在Hadoop环境中传输

import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;/** * desc:Custom Data Types Point *  * @author chenwq */public class Point3D implements WritableComparable {	public float x;	public float y;	public float z;	public Point3D(float x, float y, float z) {		this.x = x;		this.y = y;		this.z = z;	}	public Point3D() {		this(0.0f, 0.0f, 0.0f);	}	public void set(float x, float y, float z) {		this.x = x;		this.y = y;		this.z = z;	}	public void write(DataOutput out) throws IOException {		out.writeFloat(x);		out.writeFloat(y);		out.writeFloat(z);	}	public void readFields(DataInput in) throws IOException {		x = in.readFloat();		y = in.readFloat();		z = in.readFloat();	}	public String toString() {		return Float.toString(x) + ", " + Float.toString(y) + ", "				+ Float.toString(z);	}	public float distanceFromOrigin() {		return (float) Math.sqrt(x * x + y * y + z * z);	}	public int compareTo(Object other) {		float myDistance = this.distanceFromOrigin();		float otherDistance = ((Point3D) other).distanceFromOrigin();		return Float.compare(myDistance, otherDistance);	}	public boolean equals(Object o) {		Point3D other = (Point3D) o;		if (!(other instanceof Point3D)) {			return false;		}		return this.x == other.x && this.y == other.y && this.z == other.z;	}	public int hashCode() {		return Float.floatToIntBits(x) ^ Float.floatToIntBits(y)				^ Float.floatToIntBits(z);	}}

 3、自定义Point3DInputFormat类型,供MapReduce编程模型使用

import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.util.LineReader;public class Point3DinputFormat extends FileInputFormat
{ @Override protected boolean isSplitable(JobContext context, Path filename) { // TODO Auto-generated method stub return false; } @Override public RecordReader
createRecordReader(InputSplit inputsplit, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub return new objPosRecordReader(); } public static class objPosRecordReader extends RecordReader
{ public LineReader in; public Text lineKey; public Point3D lineValue; public StringTokenizer token=null; public Text line; @Override public void close() throws IOException { // TODO Auto-generated method stub } @Override public Text getCurrentKey() throws IOException, InterruptedException { //lineKey.set(token.nextToken()); return lineKey; } @Override public Point3D getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return lineValue; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return 0; } @Override public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub FileSplit split=(FileSplit)input; Configuration job=context.getConfiguration(); Path file=split.getPath(); FileSystem fs=file.getFileSystem(job); FSDataInputStream filein=fs.open(file); in=new LineReader(filein,job); line=new Text(); lineKey=new Text(); lineValue=new Point3D(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { // TODO Auto-generated method stub int linesize=in.readLine(line); if(linesize==0) return false; String[] pieces = line.toString().split(","); if(pieces.length != 4){ throw new IOException("Invalid record received"); } // try to parse floating point components of value float fx, fy, fz; try{ fx = Float.parseFloat(pieces[1].trim()); fy = Float.parseFloat(pieces[2].trim()); fz = Float.parseFloat(pieces[3].trim()); }catch(NumberFormatException nfe){ throw new IOException("Error parsing floating poing value in record"); } lineKey.set(pieces[0]); lineValue.set(fx, fy, fz); return true; } }}

 

4、编写Mapper类,这里仅仅测试自定义类型Point3D的InputFormat,不需要Reducer

import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class Point3DMapper extends Mapper
{ protected void map(Text key, Point3D value, Context context) throws IOException, InterruptedException{ context.write(key, value); }}
 

转载地址:http://cvwob.baihongyu.com/

你可能感兴趣的文章
PKI和X509证书
查看>>
使用HttpClient爬取国内疫情数据
查看>>
引用传递和值传递有什么区别
查看>>
C++从入门到放肆!
查看>>
C++是什么?怎么学?学完了能得到什么?
查看>>
初学C语言没有项目练手怎么行,这17个小项目收下不谢
查看>>
学好C语言,你只需要这几句口诀!
查看>>
选择大于努力!0基础学好C语言编程,首先要掌握的是什么?
查看>>
C语言和其他语言的不得不说的差别!
查看>>
夫妻俩在互联网公司工作,年收入曝光,网友:这么高!
查看>>
程序员5年工作经验,因频繁跳槽被面试官压工资!
查看>>
职场中神奇的程序员,却常常被人说“太直”,这是什么样的思维?
查看>>
@初学编程的朋友们,如果你能学得这些方法,学习将会更快一步!
查看>>
C/C++编程笔记:C/C++ 的编译和链接
查看>>
C/C++编程知识分享:C++四种强制转换,教你多种类型转换方式!
查看>>
全球最厉害的 14 位程序员,你知道几位呢?最厉害的研究出了它!
查看>>
C/C++基础语法复习(二):C++ 面向对象编程,你需要知道的点
查看>>
简述 C语言 有和 C++ 的基本区别,你真的懂吗?(新手面试必学)
查看>>
刚进职场的程序员,请万分珍重你的第一份工作,不要轻易辞职!
查看>>
C/C++之QT攻略——在QT中容易遇到的那些坑,千万别踩了!
查看>>