Package flumotion :: Package manager :: Module worker
[hide private]

Source Code for Module flumotion.manager.worker

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_manager_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  manager-side objects to handle worker clients 
 24  """ 
 25   
 26  from twisted.internet import defer 
 27   
 28  from flumotion.manager import base 
 29  from flumotion.common import errors, interfaces, log, registry 
 30  from flumotion.common import worker, common 
 31  from flumotion.common.vfs import registerVFSJelly 
 32   
 33  __version__ = "$Rev: 7162 $" 
 34   
 35   
36 -class WorkerAvatar(base.ManagerAvatar):
37 """ 38 I am an avatar created for a worker. 39 A reference to me is given when logging in and requesting a worker avatar. 40 I live in the manager. 41 42 @ivar feedServerPort: TCP port the feed server is listening on 43 @type feedServerPort: int 44 """ 45 logCategory = 'worker-avatar' 46 47 _portSet = None 48 feedServerPort = None 49
50 - def __init__(self, heaven, avatarId, remoteIdentity, mind, 51 feedServerPort, ports, randomPorts):
52 base.ManagerAvatar.__init__(self, heaven, avatarId, 53 remoteIdentity, mind) 54 self.feedServerPort = feedServerPort 55 56 self._portSet = worker.PortSet(self.avatarId, ports, randomPorts) 57 58 self.heaven.workerAttached(self) 59 self.vishnu.workerAttached(self) 60 61 registerVFSJelly()
62
63 - def getName(self):
64 return self.avatarId
65
66 - def makeAvatarInitArgs(klass, heaven, avatarId, remoteIdentity, 67 mind):
68 69 def havePorts(res): 70 log.debug('worker-avatar', 'got port information') 71 (_s1, feedServerPort), (_s2, (ports, random)) = res 72 return (heaven, avatarId, remoteIdentity, mind, 73 feedServerPort, ports, random)
74 log.debug('worker-avatar', 'calling mind for port information') 75 d = defer.DeferredList([mind.callRemote('getFeedServerPort'), 76 mind.callRemote('getPorts')], 77 fireOnOneErrback=True) 78 d.addCallback(havePorts) 79 return d
80 makeAvatarInitArgs = classmethod(makeAvatarInitArgs) 81
82 - def onShutdown(self):
83 self.heaven.workerDetached(self) 84 self.vishnu.workerDetached(self) 85 base.ManagerAvatar.onShutdown(self)
86
87 - def reservePorts(self, numPorts):
88 """ 89 Reserve the given number of ports on the worker. 90 91 @param numPorts: how many ports to reserve 92 @type numPorts: int 93 """ 94 return self._portSet.reservePorts(numPorts)
95
96 - def releasePorts(self, ports):
97 """ 98 Release the given list of ports on the worker. 99 100 @param ports: list of ports to release 101 @type ports: list of int 102 """ 103 self._portSet.releasePorts(ports)
104
105 - def createComponent(self, avatarId, type, nice, conf):
106 """ 107 Create a component of the given type with the given nice level. 108 109 @param avatarId: avatarId the component should use to log in 110 @type avatarId: str 111 @param type: type of the component to create 112 @type type: str 113 @param nice: the nice level to create the component at 114 @type nice: int 115 @param conf: the component's config dict 116 @type conf: dict 117 118 @returns: a deferred that will give the avatarId the component 119 will use to log in to the manager 120 """ 121 self.debug('creating %s (%s) on worker %s with nice level %d', 122 avatarId, type, self.avatarId, nice) 123 defs = registry.getRegistry().getComponent(type) 124 try: 125 entry = defs.getEntryByType('component') 126 # FIXME: use entry.getModuleName() (doesn't work atm?) 127 moduleName = defs.getSource() 128 methodName = entry.getFunction() 129 except KeyError: 130 self.warning('no "component" entry in registry of type %s, %s', 131 type, 'falling back to createComponent') 132 moduleName = defs.getSource() 133 methodName = "createComponent" 134 135 self.debug('call remote create') 136 return self.mindCallRemote('create', avatarId, type, moduleName, 137 methodName, nice, conf)
138
139 - def getComponents(self):
140 """ 141 Get a list of components that the worker is running. 142 143 @returns: a deferred that will give the avatarIds running on the 144 worker 145 """ 146 self.debug('getting component list from worker %s' % 147 self.avatarId) 148 return self.mindCallRemote('getComponents')
149 150 ### IPerspective methods, called by the worker's component 151
152 - def perspective_componentAddMessage(self, avatarId, message):
153 """ 154 Called by the worker to tell the manager to add a given message to 155 the given component. 156 157 Useful in cases where the component can't report messages itself, 158 for example because it crashed. 159 160 @param avatarId: avatarId of the component the message is about 161 @type message: L{flumotion.common.messages.Message} 162 """ 163 self.debug('received message from component %s' % avatarId) 164 self.vishnu.componentAddMessage(avatarId, message)
165 166
167 -class WorkerHeaven(base.ManagerHeaven):
168 """ 169 I interface between the Manager and worker clients. 170 For each worker client I create an L{WorkerAvatar} to handle requests. 171 I live in the manager. 172 """ 173 174 logCategory = "workerheaven" 175 avatarClass = WorkerAvatar 176
177 - def __init__(self, vishnu):
180 181 ### my methods 182
183 - def workerAttached(self, workerAvatar):
184 """ 185 Notify the heaven that the given worker has logged in. 186 187 @type workerAvatar: L{WorkerAvatar} 188 """ 189 workerName = workerAvatar.getName() 190 if not workerName in self.state.get('names'): 191 # wheee 192 host = workerAvatar.mind.broker.transport.getPeer().host 193 state = worker.ManagerWorkerState(name=workerName, host=host) 194 self.state.append('names', workerName) 195 self.state.append('workers', state) 196 else: 197 self.warning('worker %s was already registered in the heaven', 198 workerName) 199 raise errors.AlreadyConnectedError()
200
201 - def workerDetached(self, workerAvatar):
202 """ 203 Notify the heaven that the given worker has logged out. 204 205 @type workerAvatar: L{WorkerAvatar} 206 """ 207 workerName = workerAvatar.getName() 208 try: 209 self.state.remove('names', workerName) 210 for state in list(self.state.get('workers')): 211 if state.get('name') == workerName: 212 self.state.remove('workers', state) 213 except ValueError: 214 self.warning('worker %s was never registered in the heaven', 215 workerName)
216