`
linest
  • 浏览: 150766 次
  • 性别: Icon_minigender_1
  • 来自: 内蒙古
社区版块
存档分类
最新评论

读代码-SequenceFilesFromDirectory

 
阅读更多
package org.apache.mahout.text;目的:目录下文本文件转成sequence格式

main函数入口SequenceFilesFromDirectory类

三个基本项,fs  writer 和 filter
FileSystem fs = FileSystem.get(conf);
ChunkedWriter writer = new ChunkedWriter(conf, Integer.parseInt(options.get(CHUNK_SIZE_OPTION[0])), output);
    
SequenceFilesFromDirectoryFilter pathFilter;



默认PrefixAdditionFilter实现
否则动态加载SequenceFilesFromDirectoryFilter子类
String fileFilterClassName = options.get(FILE_FILTER_CLASS_OPTION[0]);
    if (PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
      pathFilter = new PrefixAdditionFilter(conf, keyPrefix, options, writer);
    } else {
      Class<? extends SequenceFilesFromDirectoryFilter> pathFilterClass = Class.forName(fileFilterClassName).asSubclass(SequenceFilesFromDirectoryFilter.class);
      Constructor<? extends SequenceFilesFromDirectoryFilter> constructor =
          pathFilterClass.getConstructor(Configuration.class, String.class, Map.class, ChunkedWriter.class);
      pathFilter = constructor.newInstance(conf, keyPrefix, options, writer);
    }



package org.apache.mahout.text;抽象的SequenceFilesFromDirectoryFilter类
遍历路径下文件,抽象函数
  @Override
  public final boolean accept(Path current) {
    log.debug("CURRENT: {}", current.getName());
    try {
      for (FileStatus fst : fs.listStatus(current)) {
        log.debug("CHILD: {}", fst.getPath().getName());
        process(fst, current);
      }
    } catch (IOException ioe) {
      throw new IllegalStateException(ioe);
    }
    return false;
  }

  protected abstract void process(FileStatus in, Path current) throws IOException;



package org.apache.mahout.text;public final class PrefixAdditionFilter extends SequenceFilesFromDirectoryFilter
默认路径加前缀的处理实现
      InputStream in = null;
      try {
        in = fs.open(fst.getPath());

        StringBuilder file = new StringBuilder();
        for (String aFit : new FileLineIterable(in, charset, false)) {
          file.append(aFit).append('\n');
        }
        String name = current.getName().equals(fst.getPath().getName())
            ? current.getName()
            : current.getName() + Path.SEPARATOR + fst.getPath().getName();
        writer.write(prefix + Path.SEPARATOR + name, file.toString());
      } 



package org.apache.mahout.text;public final class ChunkedWriter implements Closeable
对SequenceFile.Writer的一个封装

初始化
    maxChunkSizeInBytes = chunkSizeInMB * 1024 * 1024;
    fs = FileSystem.get(conf);
    currentChunkID = 0;
    writer = new SequenceFile.Writer(fs, conf, getPath(currentChunkID), Text.class, Text.class);


路径拼接
  private Path getPath(int chunkID) {
    return new Path(output, "chunk-" + chunkID);
  }



块大小超过则另外开一个writer
  public void write(String key, String value) throws IOException {
    if (currentChunkSize > maxChunkSizeInBytes) {
      writer.close();
      writer = new SequenceFile.Writer(fs, conf, getPath(currentChunkID++), Text.class, Text.class);
      currentChunkSize = 0;
    }

    Text keyT = new Text(key);
    Text valueT = new Text(value);
    currentChunkSize += keyT.getBytes().length + valueT.getBytes().length; // Overhead
    writer.append(keyT, valueT);
  }
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics