本文共 6922 字,大约阅读时间需要 23 分钟。
0、测试集样例
ball, 3.5, 12.7, 9.0car, 15, 23.76, 42.23device, 0.0, 12.4, -67.1
1、测试Point3D InputFormat
import java.io.IOException;import java.net.URI;import javax.xml.soap.Text;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** * desc:Custom Data Types TestPoint3DInputFormat
* * @author chenwq */public class TestPoint3DInputFormat { /** * @param args * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // TODO Auto-generated method stub System.out.println("hello,chenwq!"); Job job=new Job(); Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(URI.create(args[1]), conf); fs.delete(new Path(args[1])); job.setJobName("测试MyInputFormat程序。。。。。"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setInputFormatClass(Point3DinputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Point3D.class); job.setMapperClass(Point3DMapper.class); job.setNumReduceTasks(0); job.waitForCompletion(false); }}
2、自定义类型Point3D必须实现WritableComparable接口,才能在Hadoop环境中传输
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;/** * desc:Custom Data Types Point
* * @author chenwq */public class Point3D implements WritableComparable { public float x; public float y; public float z; public Point3D(float x, float y, float z) { this.x = x; this.y = y; this.z = z; } public Point3D() { this(0.0f, 0.0f, 0.0f); } public void set(float x, float y, float z) { this.x = x; this.y = y; this.z = z; } public void write(DataOutput out) throws IOException { out.writeFloat(x); out.writeFloat(y); out.writeFloat(z); } public void readFields(DataInput in) throws IOException { x = in.readFloat(); y = in.readFloat(); z = in.readFloat(); } public String toString() { return Float.toString(x) + ", " + Float.toString(y) + ", " + Float.toString(z); } public float distanceFromOrigin() { return (float) Math.sqrt(x * x + y * y + z * z); } public int compareTo(Object other) { float myDistance = this.distanceFromOrigin(); float otherDistance = ((Point3D) other).distanceFromOrigin(); return Float.compare(myDistance, otherDistance); } public boolean equals(Object o) { Point3D other = (Point3D) o; if (!(other instanceof Point3D)) { return false; } return this.x == other.x && this.y == other.y && this.z == other.z; } public int hashCode() { return Float.floatToIntBits(x) ^ Float.floatToIntBits(y) ^ Float.floatToIntBits(z); }}
3、自定义Point3DInputFormat类型,供MapReduce编程模型使用
import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.util.LineReader;public class Point3DinputFormat extends FileInputFormat{ @Override protected boolean isSplitable(JobContext context, Path filename) { // TODO Auto-generated method stub return false; } @Override public RecordReader createRecordReader(InputSplit inputsplit, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub return new objPosRecordReader(); } public static class objPosRecordReader extends RecordReader { public LineReader in; public Text lineKey; public Point3D lineValue; public StringTokenizer token=null; public Text line; @Override public void close() throws IOException { // TODO Auto-generated method stub } @Override public Text getCurrentKey() throws IOException, InterruptedException { //lineKey.set(token.nextToken()); return lineKey; } @Override public Point3D getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return lineValue; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return 0; } @Override public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub FileSplit split=(FileSplit)input; Configuration job=context.getConfiguration(); Path file=split.getPath(); FileSystem fs=file.getFileSystem(job); FSDataInputStream filein=fs.open(file); in=new LineReader(filein,job); line=new Text(); lineKey=new Text(); lineValue=new Point3D(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { // TODO Auto-generated method stub int linesize=in.readLine(line); if(linesize==0) return false; String[] pieces = line.toString().split(","); if(pieces.length != 4){ throw new IOException("Invalid record received"); } // try to parse floating point components of value float fx, fy, fz; try{ fx = Float.parseFloat(pieces[1].trim()); fy = Float.parseFloat(pieces[2].trim()); fz = Float.parseFloat(pieces[3].trim()); }catch(NumberFormatException nfe){ throw new IOException("Error parsing floating poing value in record"); } lineKey.set(pieces[0]); lineValue.set(fx, fy, fz); return true; } }}
4、编写Mapper类,这里仅仅测试自定义类型Point3D的InputFormat,不需要Reducer
import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class Point3DMapper extends Mapper{ protected void map(Text key, Point3D value, Context context) throws IOException, InterruptedException{ context.write(key, value); }}
转载地址:http://cvwob.baihongyu.com/