博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flume-ng HDFS sink原理解析
阅读量:5888 次
发布时间:2019-06-19

本文共 1618 字,大约阅读时间需要 5 分钟。

HDFS sink主要处理过程在process方法:

//循环batchSize次或者Channel为空

for(txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {

//该方法会调用BasicTransactionSemantics的具体实现

Event event = channel.take();

if (event == null) {

break;

}

......

//sfWriter是一个LRU缓存,缓存对文件Handler,最大打开文件由参数maxopenfiles控制

BucketWriter bucketWriter = sfWriters.get(lookupPath);

// 如果不存在,则构造一个缓存

if (bucketWriter == null) {

//通过HDFSWriterFactory根据filetype生成一个hdfswriter,由参数hdfs.Filetype控制;eg:HDFSDataStream

HDFSWriter hdfsWriter = writerFactory.getWriter(fileType);

//idleCallback会在bucketWriter flush完毕后从LRU中删除;

bucketWriter = new BucketWriter(rollIntervalrollSizerollCount,

batchSizecontext, realPath, realName, inUsePrefixinUseSuffix,

suffixcodeCcompType,hdfsWriter, timedRollerPool,

proxyTicketsinkCounteridleTimeout, idleCallback,

lookupPath, callTimeoutcallTimeoutPool);

sfWriters.put(lookupPath, bucketWriter);

}

......

// track一个事务内的bucket

if (!writers.contains(bucketWriter)) {

writers.add(bucketWriter);

}

// 写数据到HDFS

bucketWriter.append(event);->

open();//如果底层支持append,则通过open接口打开;否则create接口

//判断是否进行日志切换

//根据复制的副本书和目标副本数做对比,如果不满足则doRotate=false

if(doRotate) {

close();

open();

}

HDFSWriter.append(event);

if(batchCounter == batchSize) {//如果达到batchSize行进行一次flush

flush();->

doFlush()->

HDFSWriter.sync()->

FSDataoutputStream.flush/sync

}

// 提交事务之前,刷新所有的bucket

for(BucketWriter bucketWriter : writers){

bucketWriter.flush();

}

transaction.commit();

这里,无论是BucketWriter执行appendsync还是rename等操作都是提交到一个后台线程池进行异步处理:callWithTimeout,这个线程池的大小是由hdfs.threadsize来设置;

本文转自MIKE老毕 51CTO博客,原文链接:http://blog.51cto.com/boylook/1298627,如需转载请自行联系原作者

你可能感兴趣的文章
WinForm程序中两份mdf文件问题的解决
查看>>
程序计数器、反汇编工具
查看>>
Android N: jack server failed
查看>>
如何将lotus 通讯簿导入到outlook 2003中
查看>>
WinForm 应用程序中开启新的进程及控制
查看>>
js replace,正则截取字符串内容
查看>>
Thinkphp5笔记三:创建基类
查看>>
查询反模式 - GroupBy、HAVING的理解
查看>>
Android中EditText,Button等控件的设置
查看>>
TextKit简单示例
查看>>
网格最短路径算法(Dijkstra & Fast Marching)(转)
查看>>
软链接和硬链接详解
查看>>
Redis_master-slave模式
查看>>
彻底卸载删除微软Win10易升方法
查看>>
SWT/JFACE之环境配置(一)
查看>>
应用程序日志中总是说MS DTC无法正确处理DC 升级/降级事件,是什么意思
查看>>
mybatis数据处理的几种方式
查看>>
作业2
查看>>
远程主机探测技术FAQ集 - 扫描篇
查看>>
C++中调用python函数
查看>>