您现在的位置是:首页 > 文章详情

hdfs auditlog(审计日志)

日期:2018-10-07点击:741

        hdfs审计日志(Auditlog)记录了用户针对hdfs的所有操作,详细信息包括操作成功与否、用户名称、客户机地址、操作命令、操作的目录等。对于用户的每一个操作,namenode都会将这些信息以key-value对的形式组织成固定格式的一条日志,然后记录到audit.log文件中。通过审计日志,我们可以实时查看hdfs的各种操作状况、可以追踪各种误操作、可以做一些指标监控等等。

        hdfs的审计日志功能是可插拔的,用户可以通过实现默认接口扩展出满足自己所需的插件来替换hdfs默认提供的审计日志功能,或者与之并用。

启用审计日志  

        如果仅仅只启用默认的AuditLogger(DefaultAuditLogger),则在log4j.properties添加如下配置(hdfs.audit.logger必须配置为INFO级别)即可,审计日志会与namenode的系统日志独立开来保存,log4j.appender.RFAAUDIT.File可配置保存的位置及文件。 FSNamesystem根据log4j.properties中hdfs.audit.logger是否为INFO,以及是否配置了DefaultAuditLogger之外的其他AuditLogger,来决定是否启用审计日志功能。

# # hdfs audit logging # hdfs.audit.logger=INFO,NullAppender hdfs.audit.log.maxfilesize=256MB hdfs.audit.log.maxbackupindex=20 log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=${hdfs.audit.logger} log4j.additivity.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=false log4j.appender.RFAAUDIT=org.apache.log4j.RollingFileAppender log4j.appender.RFAAUDIT.File=${hadoop.log.dir}/hdfs-audit.log log4j.appender.RFAAUDIT.layout=org.apache.log4j.PatternLayout log4j.appender.RFAAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n log4j.appender.RFAAUDIT.MaxFileSize=${hdfs.audit.log.maxfilesize} log4j.appender.RFAAUDIT.MaxBackupIndex=${hdfs.audit.log.maxbackupindex}

审计日志的接口及实现

        Namenode开放了AuditLogger接口,并定义抽象类HdfsAuditLogger 实现AuditLogger,默认提供实现类DefaultAuditLogger。构造FSNamesystem时通过initAuditLoggers(Configuration conf)方法创建AuditLogger列表。在记录用户操作时,会将操作信息逐一传给列表中的每一个AuditLogger,由其做对应的审计处理。

 

 

        hdfs定义的审计日志接口及提供的实现

 

 

 

 

 

 

 

 

       通过实现Auditloger接口或者扩展HdfsAuditLogger类,用户可以实现自己的AuditLogger来满足所需,例如有针对性的记录审计日志(当集群、访问量上规模之后疯狂刷日志必然对namenode有影响,有针对性的记录有必要的日志是缓解此状况的一种可选方案)、扩展功能、将日志接入实时系统做实时分析或监控等。用户通过配置项dfs.namenode.audit.loggers在hdfs-site.xml中配置Auditloger的实现类,多个实现可以通过逗号分开,更改配置后重启namenode接口生效。FSNamesystem的initAuditLoggers(Configuration conf)方法通过该配置项加载并实例化实现类,初始化后存入集合。如果用户没有配置,那么默认使用DefaultAuditLogger。如果启动了nntop功能,还会使用TopAuditLogger。

      FSNamesystem 初始化所有的AuditLogger:

private List<AuditLogger> initAuditLoggers(Configuration conf) { // Initialize the custom access loggers if configured. //DFS_NAMENODE_AUDIT_LOGGERS_KEY=dfs.namenode.audit.loggers Collection<String> alClasses = conf.getStringCollection(DFS_NAMENODE_AUDIT_LOGGERS_KEY); List<AuditLogger> auditLoggers = Lists.newArrayList(); if (alClasses != null && !alClasses.isEmpty()) { for (String className : alClasses) { try { AuditLogger logger; if (DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME.equals(className)) { logger = new DefaultAuditLogger(); } else { logger = (AuditLogger) Class.forName(className).newInstance(); } logger.initialize(conf); auditLoggers.add(logger); } catch (RuntimeException re) { throw re; } catch (Exception e) { throw new RuntimeException(e); } } } // Make sure there is at least one logger installed. // 如果用户没有提供AuditLoggers,则默认使用DefaultAuditLogger if (auditLoggers.isEmpty()) { auditLoggers.add(new DefaultAuditLogger()); } // Add audit logger to calculate top users // 默认topConf.isEnabled是开启的,用于指标聚合、上报 // TopAuditLogger类似 top命令 if (topConf.isEnabled) { topMetrics = new TopMetrics(conf, topConf.nntopReportingPeriodsMs); auditLoggers.add(new TopAuditLogger(topMetrics)); } return Collections.unmodifiableList(auditLoggers); }

DefaultAuditLogger记录日志:

@Override public void logAuditEvent(boolean succeeded, String userName, InetAddress addr, String cmd, String src, String dst, FileStatus status, UserGroupInformation ugi, DelegationTokenSecretManager dtSecretManager) { if (auditLog.isInfoEnabled()) { final StringBuilder sb = auditBuffer.get(); sb.setLength(0); sb.append("allowed=").append(succeeded).append("\t"); sb.append("ugi=").append(userName).append("\t"); sb.append("ip=").append(addr).append("\t"); sb.append("cmd=").append(cmd).append("\t"); sb.append("src=").append(src).append("\t"); sb.append("dst=").append(dst).append("\t"); if (null == status) { sb.append("perm=null"); } else { sb.append("perm="); sb.append(status.getOwner()).append(":"); sb.append(status.getGroup()).append(":"); sb.append(status.getPermission()); } if (logTokenTrackingId) { sb.append("\t").append("trackingId="); String trackingId = null; if (ugi != null && dtSecretManager != null && ugi.getAuthenticationMethod() == AuthenticationMethod.TOKEN) { for (TokenIdentifier tid: ugi.getTokenIdentifiers()) { if (tid instanceof DelegationTokenIdentifier) { DelegationTokenIdentifier dtid = (DelegationTokenIdentifier)tid; trackingId = dtSecretManager.getTokenTrackingId(dtid); break; } } } sb.append(trackingId); } sb.append("\t").append("proto="); sb.append(NamenodeWebHdfsMethods.isWebHdfsInvocation() ? "webhdfs" : "rpc"); logAuditMessage(sb.toString()); } } public void logAuditMessage(String message) { auditLog.info(message); }

 

审计过程

    客户端对hdfs的所有操作,不管成功与否都会由FSNamesystem记录下。以删除操作为例,FSNamesystem在正常删除给定src后调用logAuditEvent(true, "delete", src)记录此次成功的delete操作,如果删除失败抛出异常,则调用logAuditEvent(false, "delete", src)记录此次失败的delete操作。

 boolean delete(String src, boolean recursive, boolean logRetryCache) throws IOException { waitForLoadingFSImage(); BlocksMapUpdateInfo toRemovedBlocks = null; writeLock(); boolean ret = false; try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot delete " + src); toRemovedBlocks = FSDirDeleteOp.delete( this, src, recursive, logRetryCache); ret = toRemovedBlocks != null; } catch (AccessControlException e) { logAuditEvent(false, "delete", src); throw e; } finally { writeUnlock(); } getEditLog().logSync(); if (toRemovedBlocks != null) { removeBlocks(toRemovedBlocks); // Incremental deletion of blocks } logAuditEvent(true, "delete", src); return ret; } //判断是否是外部调用,只对rpc调用和webHdfs调用做审计 boolean isExternalInvocation() { return Server.isRpcInvocation() || NamenodeWebHdfsMethods.isWebHdfsInvocation(); } //判断是否启用审计日志功能 public boolean isAuditEnabled() { return !isDefaultAuditLogger || auditLog.isInfoEnabled(); } //succeeded:操作是否成功 cmd:操作命令 src:操作对象 private void logAuditEvent(boolean succeeded, String cmd, String src) throws IOException { logAuditEvent(succeeded, cmd, src, null, null); } private void logAuditEvent(boolean succeeded, String cmd, String src, String dst, HdfsFileStatus stat) throws IOException { if (isAuditEnabled() && isExternalInvocation()) { logAuditEvent(succeeded, getRemoteUser(), getRemoteIp(), cmd, src, dst, stat); } } //获取操作对象的信息,调用所有的auditloger 做审计 private void logAuditEvent(boolean succeeded, UserGroupInformation ugi, InetAddress addr, String cmd, String src, String dst, HdfsFileStatus stat) { FileStatus status = null; if (stat != null) { Path symlink = stat.isSymlink() ? new Path(stat.getSymlink()) : null; Path path = dst != null ? new Path(dst) : new Path(src); status = new FileStatus(stat.getLen(), stat.isDir(), stat.getReplication(), stat.getBlockSize(), stat.getModificationTime(), stat.getAccessTime(), stat.getPermission(), stat.getOwner(), stat.getGroup(), symlink, path); } for (AuditLogger logger : auditLoggers) { if (logger instanceof HdfsAuditLogger) { HdfsAuditLogger hdfsLogger = (HdfsAuditLogger) logger; hdfsLogger.logAuditEvent(succeeded, ugi.toString(), addr, cmd, src, dst, status, ugi, dtSecretManager); } else { logger.logAuditEvent(succeeded, ugi.toString(), addr, cmd, src, dst, status); } } }

审计日志接入实时系统的方法

  • 方法1:扩展Log4J的appender,由appender将日志发送到kafka。
  • 方法2:直接让kafka的producer读取日志文件。
原文链接:https://my.oschina.net/u/3987818/blog/2223349
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章