Hadoop的多文件输出及自定义文件名方法是什么

本篇内容介绍了“Hadoop的多文件输出及自定义文件名方法是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

十载的濠江网站建设经验,针对设计、前端、开发、售后、文案、推广等六对一服务,响应快,48小时及时工作处理。营销型网站的优势是能够根据用户设备显示端的尺寸不同,自动调整濠江建站的显示方式,使网站能够适用不同显示终端,在浏览器中调整网站的宽度,无论在任何一种浏览器上浏览网站,都能展现优雅布局与设计,从而大程度地提升浏览体验。创新互联公司从事“濠江网站设计”,“濠江网站推广”以来,每个客户项目都认真落实执行。

    首先是输出格式的类,也就是job.setOutputFormatClass(……)参数列表中的类:

public class MoreFileOutputFormat extends Multiple
{
  @Override
  protected String generateFileNameForKeyValue(Text key, Text value,Configuration conf) 
  {
      return "Your name";
  }
}

    这里,继承Multiple类后必须重写generateFileNameForKeyValue()方法,这个方法返回的字符串作为输出文件的文件名。内容有各位自己根据需要编写。同时,key和value的值也根据自己的需要更换。

    接下来是Multiple模板类的代码:

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

public abstract class Multiple, V extends Writable>
  extends FileOutputFormat 
{
   // 接口类,需要在调用程序中实现generateFileNameForKeyValue来获取文件名
   private MultiRecordWriter writer = null;
   public RecordWriter getRecordWriter(TaskAttemptContext job)
     throws IOException, InterruptedException 
   {
        if (writer == null) 
        {
             writer = new MultiRecordWriter(job, getTaskOutputPath(job));
        }
        return writer;
   }
    
   /**
    * get task output path
    * 
    * @param conf
    * @return
    * @throws IOException
    */
   private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException
   {
        Path workPath = null;
        OutputCommitter committer = super.getOutputCommitter(conf);
        if (committer instanceof FileOutputCommitter) 
        {
             workPath = ((FileOutputCommitter) committer).getWorkPath();
        } 
        else 
        {
             Path outputPath = super.getOutputPath(conf);
             if (outputPath == null) 
             {
                  throw new IOException("Undefined job output-path");
             }
             workPath = outputPath;
        }
        return workPath;
   }
    
   //继承后重写以获得文件名
   protected abstract String generateFileNameForKeyValue(K key, V value,Configuration conf);
    
   //实现记录写入器RecordWriter类 (内部类)
   public class MultiRecordWriter extends RecordWriter 
   {
        /** RecordWriter的缓存 */
        private HashMap> recordWriters = null;
        private TaskAttemptContext job = null;
        
        /** 输出目录 */
        private Path workPath = null;
        public MultiRecordWriter(TaskAttemptContext job, Path workPath) 
        {
             super();
             this.job = job;
             this.workPath = workPath;
             recordWriters = new HashMap>();
        }
          
        @Override
        public void close(TaskAttemptContext context) throws IOException,
          InterruptedException 
        {
             Iterator> values = this.recordWriters.values().iterator();
             while (values.hasNext()) 
             {
                  values.next().close(context);
             }
             this.recordWriters.clear();
        }
          
        @Override
        public void write(K key, V value) throws IOException,
          InterruptedException 
        {
             // 得到输出文件名
             String baseName = generateFileNameForKeyValue(key, value,job.getConfiguration());
             // 如果recordWriters里没有文件名,那么就建立。否则就直接写值。
             RecordWriter rw = this.recordWriters.get(baseName);
             if (rw == null) 
             {
                  rw = getBaseRecordWriter(job, baseName);
                  this.recordWriters.put(baseName, rw);
             }
             rw.write(key, value);
        }
          
        // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}
        private RecordWriter getBaseRecordWriter(TaskAttemptContext job,
          String baseName) throws IOException, InterruptedException 
        {
             Configuration conf = job.getConfiguration();
             // 查看是否使用解码器
             boolean isCompressed = getCompressOutput(job);
             RecordWriter recordWriter = null;
             if (isCompressed) 
             {
                  Class codecClass = getOutputCompressorClass(
                    job, GzipCodec.class);
                  CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
                  Path file = new Path(workPath, baseName + codec.getDefaultExtension());
                  FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
                  // 这里我使用的自定义的OutputFormat
                  recordWriter = new MyRecordWriter(new DataOutputStream(
                    codec.createOutputStream(fileOut)));
             } 
             else 
             {
                  Path file;
                  System.out.println("workPath = " + workPath + ", basename = " + baseName);
                  file = new Path(workPath, baseName);
                  FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
                  // 这里我使用的自定义的OutputFormat
                  recordWriter = new MyRecordWriter(fileOut);
             }
             return recordWriter;
        }
   }
}

    现在来实现Multiple的内部类MultiRecordWriter中的MyRecordWriter类以实现自己想要的输出方式:

public class MyRecordWriter extends RecordWriter
{
   private static final String utf8 = "UTF-8";//定义字符编码格式     
   protected DataOutputStream out;  
       
   public MyRecordWriter(DataOutputStream out) 
   {
        this.out = out;  
   }
       
   private void writeObject(Object o) throws IOException 
   {
        if (o instanceof Text)
        {
             Text to = (Text) o;
             out.write(to.getBytes(), 0, to.getLength());
        }
        else
        {
               //输出成字节流。如果不是文本类的,请更改此处
             out.write(o.toString().getBytes(utf8));
        }
   }
     
   /** 
    * 将mapreduce的key,value以自定义格式写入到输出流中 
    */
   public synchronized void write(K key, V value) throws IOException
   {
        writeObject(value);
   }  
     
   public synchronized void close(TaskAttemptContext context) throws IOException
   {
        out.close();
   } 
}

    这个类中还有其它集中方法,不过笔者不需要那些方法,所以把它们都删除了,但最初的文件也删除了- -,所以现在找不到了。请大家见谅。

    现在,只需在main()或者run()函数中将job的输出格式设置成MoreFileOutputFormat类就行了,如下:

job.setOutputFormatClass(MoreFileOutputFormatClass);

“Hadoop的多文件输出及自定义文件名方法是什么”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注创新互联网站,小编将为大家输出更多高质量的实用文章!


本文名称:Hadoop的多文件输出及自定义文件名方法是什么
URL分享:http://azwzsj.com/article/pjhijc.html