flink - accumulator
读accumlator JobManager 在job finish的时候会汇总accumulator的值, newJobStatus match { case JobStatus.FINISHED => try { val accumulatorResults = executionGraph.getAccumulatorsSerialized() val result = new SerializedJobExecutionResult( jobID, jobInfo.duration, accumulatorResults) jobInfo.client ! decorateMessage(JobResultSuccess(result)) } 在client请求accumulation时, public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) throws Exception { ActorGateway jobManagerGateway = getJobMa...