InstanceManager用于管理JobManager申请到的taskManager和slots资源
/**
* Simple manager that keeps track of which TaskManager are available and alive.
*/
public class InstanceManager {
// ------------------------------------------------------------------------
// Fields
// ------------------------------------------------------------------------
//分别以InstanceId和ResourceId来索引Instance
/** Set of hosts known to run a task manager that are thus able to execute tasks (by ID). */
private final Map<InstanceID, Instance> registeredHostsById;
/** Set of hosts known to run a task manager that are thus able to execute tasks (by ResourceID). */
private final Map<ResourceID, Instance> registeredHostsByResource;
/** Set of hosts that were present once and have died */
private final Set<ResourceID> deadHosts;
/** Listeners that want to be notified about availability and disappearance of instances */
private final List<InstanceListener> instanceListeners = new ArrayList<>(); //Instance资源发生变化时,需要通知谁,如Scheduler
/** The total number of task slots that the system has */
private int totalNumberOfAliveTaskSlots;
关键的操作,
registerTaskManager
/**
* Registers a task manager. Registration of a task manager makes it available to be used
* for the job execution.
*
* @param taskManagerGateway gateway to the task manager
* @param taskManagerLocation Location info of the TaskManager
* @param resources Hardware description of the TaskManager
* @param numberOfSlots Number of available slots on the TaskManager
* @return The assigned InstanceID of the registered task manager
*/
public InstanceID registerTaskManager(
TaskManagerGateway taskManagerGateway,
TaskManagerLocation taskManagerLocation,
HardwareDescription resources,
int numberOfSlots) {
synchronized (this.lock) {
InstanceID instanceID = new InstanceID();
Instance host = new Instance( //创建新的instance
taskManagerGateway,
taskManagerLocation,
instanceID,
resources,
numberOfSlots);
registeredHostsById.put(instanceID, host); //register
registeredHostsByResource.put(taskManagerLocation.getResourceID(), host);
totalNumberOfAliveTaskSlots += numberOfSlots;
host.reportHeartBeat();
// notify all listeners (for example the scheduler)
notifyNewInstance(host);
return instanceID;
}
}
其中,notifyNewInstance
private void notifyNewInstance(Instance instance) {
synchronized (this.instanceListeners) {
for (InstanceListener listener : this.instanceListeners) {
try {
listener.newInstanceAvailable(instance); //调用listener的newInstanceAvailable
}
catch (Throwable t) {
LOG.error("Notification of new instance availability failed.", t);
}
}
}
}
Instance
看注释,instance就是一种抽象
用于描述注册到JobManager,并准备接受work的TaskManager
/**
* An instance represents a {@link org.apache.flink.runtime.taskmanager.TaskManager}
* registered at a JobManager and ready to receive work.
*/
public class Instance implements SlotOwner {
/** The instance gateway to communicate with the instance */
private final TaskManagerGateway taskManagerGateway;
/** The instance connection information for the data transfer. */
private final TaskManagerLocation location;
/** A description of the resources of the task manager */
private final HardwareDescription resources;
/** The ID identifying the taskManager. */
private final InstanceID instanceId;
/** The number of task slots available on the node */
private final int numberOfSlots;
/** A list of available slot positions */
private final Queue<Integer> availableSlots; //注意这里记录的不是slot,而是position,因为slot是在用的时候创建的
/** Allocated slots on this taskManager */
private final Set<Slot> allocatedSlots = new HashSet<Slot>();
/** A listener to be notified upon new slot availability */
private SlotAvailabilityListener slotAvailabilityListener; //listener用于通知当slot状态发生变化
/** Time when last heat beat has been received from the task manager running on this taskManager. */
private volatile long lastReceivedHeartBeat = System.currentTimeMillis();
核心的操作,
申请slot
/**
* Allocates a simple slot on this TaskManager instance. This method returns {@code null}, if no slot
* is available at the moment.
*
* @param jobID The ID of the job that the slot is allocated for.
*
* @return A simple slot that represents a task slot on this TaskManager instance, or null, if the
* TaskManager instance has no more slots available.
*
* @throws InstanceDiedException Thrown if the instance is no longer alive by the time the
* slot is allocated.
*/
public SimpleSlot allocateSimpleSlot(JobID jobID) throws InstanceDiedException {
synchronized (instanceLock) {
Integer nextSlot = availableSlots.poll(); //看看有没有available的slot position
if (nextSlot == null) {
return null;
}
else {
SimpleSlot slot = new SimpleSlot(jobID, this, location, nextSlot, taskManagerGateway);
allocatedSlots.add(slot);
return slot;
}
}
}
归还slot
/**
* Returns a slot that has been allocated from this instance. The slot needs have been canceled
* prior to calling this method.
*
* <p>The method will transition the slot to the "released" state. If the slot is already in state
* "released", this method will do nothing.</p>
*
* @param slot The slot to return.
* @return True, if the slot was returned, false if not.
*/
@Override
public boolean returnAllocatedSlot(Slot slot) {
if (slot.markReleased()) {
LOG.debug("Return allocated slot {}.", slot);
synchronized (instanceLock) {
if (this.allocatedSlots.remove(slot)) {
this.availableSlots.add(slot.getSlotNumber());
if (this.slotAvailabilityListener != null) {
this.slotAvailabilityListener.newSlotAvailable(this); //通知有个slot可以用
}
return true;
}
}
}
}