hdfs auditlog(审计日志)
hdfs审计日志(Auditlog)记录了用户针对hdfs的所有操作,详细信息包括操作成功与否、用户名称、客户机地址、操作命令、操作的目录等。对于用户的每一个操作,namenode都会将这些信息以key-value对的形式组织成固定格式的一条日志,然后记录到audit.log文件中。通过审计日志,我们可以实时查看hdfs的各种操作状况、可以追踪各种误操作、可以做一些指标监控等等。
hdfs的审计日志功能是可插拔的,用户可以通过实现默认接口扩展出满足自己所需的插件来替换hdfs默认提供的审计日志功能,或者与之并用。
启用审计日志
如果仅仅只启用默认的AuditLogger(DefaultAuditLogger),则在log4j.properties添加如下配置(hdfs.audit.logger必须配置为INFO级别)即可,审计日志会与namenode的系统日志独立开来保存,log4j.appender.RFAAUDIT.File可配置保存的位置及文件。 FSNamesystem根据log4j.properties中hdfs.audit.logger是否为INFO,以及是否配置了DefaultAuditLogger之外的其他AuditLogger,来决定是否启用审计日志功能。
#
# hdfs audit logging
#
hdfs.audit.logger=INFO,NullAppender
hdfs.audit.log.maxfilesize=256MB
hdfs.audit.log.maxbackupindex=20
log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=${hdfs.audit.logger}
log4j.additivity.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=false
log4j.appender.RFAAUDIT=org.apache.log4j.RollingFileAppender
log4j.appender.RFAAUDIT.File=${hadoop.log.dir}/hdfs-audit.log
log4j.appender.RFAAUDIT.layout=org.apache.log4j.PatternLayout
log4j.appender.RFAAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
log4j.appender.RFAAUDIT.MaxFileSize=${hdfs.audit.log.maxfilesize}
log4j.appender.RFAAUDIT.MaxBackupIndex=${hdfs.audit.log.maxbackupindex}
审计日志的接口及实现
Namenode开放了AuditLogger接口,并定义抽象类HdfsAuditLogger 实现AuditLogger,默认提供实现类DefaultAuditLogger。构造FSNamesystem时通过initAuditLoggers(Configuration conf)方法创建AuditLogger列表。在记录用户操作时,会将操作信息逐一传给列表中的每一个AuditLogger,由其做对应的审计处理。
通过实现Auditloger接口或者扩展HdfsAuditLogger类,用户可以实现自己的AuditLogger来满足所需,例如有针对性的记录审计日志(当集群、访问量上规模之后疯狂刷日志必然对namenode有影响,有针对性的记录有必要的日志是缓解此状况的一种可选方案)、扩展功能、将日志接入实时系统做实时分析或监控等。用户通过配置项dfs.namenode.audit.loggers在hdfs-site.xml中配置Auditloger的实现类,多个实现可以通过逗号分开,更改配置后重启namenode接口生效。FSNamesystem的initAuditLoggers(Configuration conf)方法通过该配置项加载并实例化实现类,初始化后存入集合。如果用户没有配置,那么默认使用DefaultAuditLogger。如果启动了nntop功能,还会使用TopAuditLogger。
FSNamesystem 初始化所有的AuditLogger:
private List<AuditLogger> initAuditLoggers(Configuration conf) {
// Initialize the custom access loggers if configured.
//DFS_NAMENODE_AUDIT_LOGGERS_KEY=dfs.namenode.audit.loggers
Collection<String> alClasses = conf.getStringCollection(DFS_NAMENODE_AUDIT_LOGGERS_KEY);
List<AuditLogger> auditLoggers = Lists.newArrayList();
if (alClasses != null && !alClasses.isEmpty()) {
for (String className : alClasses) {
try {
AuditLogger logger;
if (DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME.equals(className)) {
logger = new DefaultAuditLogger();
} else {
logger = (AuditLogger) Class.forName(className).newInstance();
}
logger.initialize(conf);
auditLoggers.add(logger);
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
// Make sure there is at least one logger installed.
// 如果用户没有提供AuditLoggers,则默认使用DefaultAuditLogger
if (auditLoggers.isEmpty()) {
auditLoggers.add(new DefaultAuditLogger());
}
// Add audit logger to calculate top users
// 默认topConf.isEnabled是开启的,用于指标聚合、上报
// TopAuditLogger类似 top命令
if (topConf.isEnabled) {
topMetrics = new TopMetrics(conf, topConf.nntopReportingPeriodsMs);
auditLoggers.add(new TopAuditLogger(topMetrics));
}
return Collections.unmodifiableList(auditLoggers);
}
DefaultAuditLogger记录日志:
@Override
public void logAuditEvent(boolean succeeded, String userName,
InetAddress addr, String cmd, String src, String dst,
FileStatus status, UserGroupInformation ugi,
DelegationTokenSecretManager dtSecretManager) {
if (auditLog.isInfoEnabled()) {
final StringBuilder sb = auditBuffer.get();
sb.setLength(0);
sb.append("allowed=").append(succeeded).append("\t");
sb.append("ugi=").append(userName).append("\t");
sb.append("ip=").append(addr).append("\t");
sb.append("cmd=").append(cmd).append("\t");
sb.append("src=").append(src).append("\t");
sb.append("dst=").append(dst).append("\t");
if (null == status) {
sb.append("perm=null");
} else {
sb.append("perm=");
sb.append(status.getOwner()).append(":");
sb.append(status.getGroup()).append(":");
sb.append(status.getPermission());
}
if (logTokenTrackingId) {
sb.append("\t").append("trackingId=");
String trackingId = null;
if (ugi != null && dtSecretManager != null
&& ugi.getAuthenticationMethod() == AuthenticationMethod.TOKEN) {
for (TokenIdentifier tid: ugi.getTokenIdentifiers()) {
if (tid instanceof DelegationTokenIdentifier) {
DelegationTokenIdentifier dtid =
(DelegationTokenIdentifier)tid;
trackingId = dtSecretManager.getTokenTrackingId(dtid);
break;
}
}
}
sb.append(trackingId);
}
sb.append("\t").append("proto=");
sb.append(NamenodeWebHdfsMethods.isWebHdfsInvocation() ? "webhdfs" : "rpc");
logAuditMessage(sb.toString());
}
}
public void logAuditMessage(String message) {
auditLog.info(message);
}
审计过程
客户端对hdfs的所有操作,不管成功与否都会由FSNamesystem记录下。以删除操作为例,FSNamesystem在正常删除给定src后调用logAuditEvent(true, "delete", src)记录此次成功的delete操作,如果删除失败抛出异常,则调用logAuditEvent(false, "delete", src)记录此次失败的delete操作。
boolean delete(String src, boolean recursive, boolean logRetryCache)
throws IOException {
waitForLoadingFSImage();
BlocksMapUpdateInfo toRemovedBlocks = null;
writeLock();
boolean ret = false;
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot delete " + src);
toRemovedBlocks = FSDirDeleteOp.delete(
this, src, recursive, logRetryCache);
ret = toRemovedBlocks != null;
} catch (AccessControlException e) {
logAuditEvent(false, "delete", src);
throw e;
} finally {
writeUnlock();
}
getEditLog().logSync();
if (toRemovedBlocks != null) {
removeBlocks(toRemovedBlocks); // Incremental deletion of blocks
}
logAuditEvent(true, "delete", src);
return ret;
}
//判断是否是外部调用,只对rpc调用和webHdfs调用做审计
boolean isExternalInvocation() {
return Server.isRpcInvocation() || NamenodeWebHdfsMethods.isWebHdfsInvocation();
}
//判断是否启用审计日志功能
public boolean isAuditEnabled() {
return !isDefaultAuditLogger || auditLog.isInfoEnabled();
}
//succeeded:操作是否成功 cmd:操作命令 src:操作对象
private void logAuditEvent(boolean succeeded, String cmd, String src)
throws IOException {
logAuditEvent(succeeded, cmd, src, null, null);
}
private void logAuditEvent(boolean succeeded, String cmd, String src,
String dst, HdfsFileStatus stat) throws IOException {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(succeeded, getRemoteUser(), getRemoteIp(),
cmd, src, dst, stat);
}
}
//获取操作对象的信息,调用所有的auditloger 做审计
private void logAuditEvent(boolean succeeded,
UserGroupInformation ugi, InetAddress addr, String cmd, String src,
String dst, HdfsFileStatus stat) {
FileStatus status = null;
if (stat != null) {
Path symlink = stat.isSymlink() ? new Path(stat.getSymlink()) : null;
Path path = dst != null ? new Path(dst) : new Path(src);
status = new FileStatus(stat.getLen(), stat.isDir(),
stat.getReplication(), stat.getBlockSize(), stat.getModificationTime(),
stat.getAccessTime(), stat.getPermission(), stat.getOwner(),
stat.getGroup(), symlink, path);
}
for (AuditLogger logger : auditLoggers) {
if (logger instanceof HdfsAuditLogger) {
HdfsAuditLogger hdfsLogger = (HdfsAuditLogger) logger;
hdfsLogger.logAuditEvent(succeeded, ugi.toString(), addr, cmd, src, dst,
status, ugi, dtSecretManager);
} else {
logger.logAuditEvent(succeeded, ugi.toString(), addr,
cmd, src, dst, status);
}
}
}
审计日志接入实时系统的方法
- 方法1:扩展Log4J的appender,由appender将日志发送到kafka。
- 方法2:直接让kafka的producer读取日志文件。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
大众点评评分爬取-图文识别ORC
大众点评评分爬取-图文识别ORC 十一了,没出去玩,因为老婆要加班,我陪着。 晚上的时候她说要一些点评的评分数据,我合计了一下scrapy request一下应该很好做,就答应下来了,感觉没什么难度嘛。 但是呢没那么简单。需要人验证的问题就不说了,我觉得这个我也解决不了,比较吸引我的是他的评分展现方式。 大众点评这块展示用的是图片,css offset方式 selector那套行不通 这里我使用的 tesseract 图片文字识别 下面是大概流程 爬取页面 这里是使用Selenium进行页面访问,然后截屏 代码片段 opt = Options() opt.add_argument('--headless') self.driver = webdriver.Chrome(executable_path='/Users/xiangc/bin/chromedriver', options=opt) self.wait = WebDriverWait(self.driver, 10) self.driver.get('http://www.dianping.com/shop/4227604')...
-
下一篇
java使用比特币开源类库bitcoinj开发比特币入门指南
bitcoinj是一个使用比特币协议的库。它可以维护钱包,发送/接收交易而无需比特币核心的本地副本,并具有许多其他高级功能。它是用Java实现的,但可以通过任何JVM兼容语言中使用:包括Python和JavaScript中的示例。 它附带完整的文档,并建立了许多大型,众所周知的比特币应用程序和服务。下面我们来看看如何使用它。 初始设置 bitcoinj内置了日志记录和断言。无论是否指定了-ea标志,都会默认检查断言。记录由SLF4J库处理。它允许你选择你更喜欢使用的日志系统,例如JDK日志记录,Android日志记录等。默认情况下,我们使用简单的logger来打印stderr感兴趣的大部分内容。你可以通过切换lib目录中的jar文件来选择一个新的logger。 bitcoinj使用Maven作为其构建系统,并通过git分发。你可以使用源代码/ jar下载,但直接从源存储库获取它更安全。 要获取代码并安装它,请抓取Maven或Gradle,并将其添加到你的路径中。还要确保安装了git。可能你的Java IDE也有一些Maven/Gradle和Git集成,但是通过命令行使用它们还是非常有用...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7,8上快速安装Gitea,搭建Git服务器
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- CentOS8编译安装MySQL8.0.19
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2全家桶,快速入门学习开发网站教程
- Dcoker安装(在线仓库),最新的服务器搭配容器使用