| Trees | Indices | Help |
|---|
|
|
1 # -*- Mode: Python; test-case-name: flumotion.test.test_manager_manager -*-
2 # vi:si:et:sw=4:sts=4:ts=4
3
4 # Flumotion - a streaming media server
5 # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L.
6 # Copyright (C) 2010,2011 Flumotion Services, S.A.
7 # All rights reserved.
8 #
9 # This file may be distributed and/or modified under the terms of
10 # the GNU Lesser General Public License version 2.1 as published by
11 # the Free Software Foundation.
12 # This file is distributed without any warranty; without even the implied
13 # warranty of merchantability or fitness for a particular purpose.
14 # See "LICENSE.LGPL" in the source distribution for more information.
15 #
16 # Headers in this file shall remain intact.
17
18 """
19 manager implementation and related classes
20
21 API Stability: semi-stable
22
23 @var LOCAL_IDENTITY: an identity for the manager itself; can be used
24 to compare against to verify that the manager
25 requested an action
26 @type LOCAL_IDENTITY: L{LocalIdentity}
27 """
28
29 import os
30
31 from twisted.internet import reactor, defer
32 from twisted.spread import pb
33 from twisted.cred import portal
34 from zope.interface import implements
35
36 from flumotion.common import errors, interfaces, log, registry
37 from flumotion.common import planet, common, messages, reflectcall, server
38 from flumotion.common.i18n import N_, gettexter
39 from flumotion.common.identity import RemoteIdentity, LocalIdentity
40 from flumotion.common.netutils import addressGetHost
41 from flumotion.common.planet import moods
42 from flumotion.configure import configure
43 from flumotion.manager import admin, component, worker, base, config
44 from flumotion.twisted import portal as fportal
45 from flumotion.project import project
46
47 __all__ = ['ManagerServerFactory', 'Vishnu']
48 __version__ = "$Rev$"
49 T_ = gettexter()
50 LOCAL_IDENTITY = LocalIdentity('manager')
51
52
53 # an internal class
54
55
57 """
58 I implement L{twisted.cred.portal.IRealm}.
59 I make sure that when a L{pb.Avatar} is requested through me, the
60 Avatar being returned knows about the mind (client) requesting
61 the Avatar.
62 """
63
64 implements(portal.IRealm)
65
66 logCategory = 'dispatcher'
67
69 """
70 @param computeIdentity: see L{Vishnu.computeIdentity}
71 @type computeIdentity: callable
72 """
73 self._interfaceHeavens = {} # interface -> heaven
74 self._computeIdentity = computeIdentity
75 self._bouncer = None
76 self._avatarKeycards = {} # avatarId -> keycard
77
79 """
80 @param bouncer: the bouncer to authenticate with
81 @type bouncer: L{flumotion.component.bouncers.bouncer}
82 """
83 self._bouncer = bouncer
84
86 """
87 Register a Heaven as managing components with the given interface.
88
89 @type interface: L{twisted.python.components.Interface}
90 @param interface: a component interface to register the heaven with.
91 """
92 assert isinstance(heaven, base.ManagerHeaven)
93
94 self._interfaceHeavens[interface] = heaven
95
96 ### IRealm methods
97
99
100 def got_avatar(avatar):
101 if avatar.avatarId in heaven.avatars:
102 raise errors.AlreadyConnectedError(avatar.avatarId)
103 heaven.avatars[avatar.avatarId] = avatar
104 self._avatarKeycards[avatar.avatarId] = keycard
105
106 # OK so this is byzantine, but test_manager_manager actually
107 # uses these kwargs to set its own info. so don't change
108 # these args or their order or you will break your test
109 # suite.
110
111 def cleanup(avatarId=avatar.avatarId, avatar=avatar, mind=mind):
112 self.info('lost connection to client %r', avatar)
113 del heaven.avatars[avatar.avatarId]
114 avatar.onShutdown()
115 # avoid leaking the keycard
116 keycard = self._avatarKeycards.pop(avatarId)
117 if self._bouncer:
118 try:
119 self._bouncer.removeKeycard(keycard)
120 except KeyError:
121 self.warning("bouncer forgot about keycard %r",
122 keycard)
123
124 return (pb.IPerspective, avatar, cleanup)
125
126 def got_error(failure):
127 # If we failed for some reason, we want to drop the connection.
128 # However, we want the failure to get to the client, so we don't
129 # call loseConnection() immediately - we return the failure first.
130 # loseConnection() will then not drop the connection until it has
131 # finished sending the current data to the client.
132 reactor.callLater(0, mind.broker.transport.loseConnection)
133 return failure
134
135 if pb.IPerspective not in ifaces:
136 raise errors.NoPerspectiveError(avatarId)
137 if len(ifaces) != 2:
138 # IPerspective and the specific avatar interface.
139 raise errors.NoPerspectiveError(avatarId)
140 iface = [x for x in ifaces if x != pb.IPerspective][0]
141 if iface not in self._interfaceHeavens:
142 self.warning('unknown interface %r', iface)
143 raise errors.NoPerspectiveError(avatarId)
144
145 heaven = self._interfaceHeavens[iface]
146 klass = heaven.avatarClass
147 host = addressGetHost(mind.broker.transport.getPeer())
148 d = self._computeIdentity(keycard, host)
149 d.addCallback(lambda identity: \
150 klass.makeAvatar(heaven, avatarId, identity, mind))
151 d.addCallbacks(got_avatar, got_error)
152 return d
153
154
156 """
157 I am an object that ties together different objects related to a
158 component. I am used as values in a lookup hash in the vishnu.
159 """
160
162 self.state = None # ManagerComponentState; created first
163 self.id = None # avatarId of the eventual ComponentAvatar
164 self.avatar = None # ComponentAvatar
165 self.jobState = None # ManagerJobState of a running component
166
167
169 """
170 I am the toplevel manager object that knows about all
171 heavens and factories.
172
173 @cvar dispatcher: dispatcher to create avatars
174 @type dispatcher: L{Dispatcher}
175 @cvar workerHeaven: the worker heaven
176 @type workerHeaven: L{worker.WorkerHeaven}
177 @cvar componentHeaven: the component heaven
178 @type componentHeaven: L{component.ComponentHeaven}
179 @cvar adminHeaven: the admin heaven
180 @type adminHeaven: L{admin.AdminHeaven}
181 @cvar configDir: the configuration directory for
182 this Vishnu's manager
183 @type configDir: str
184 """
185
186 implements(server.IServable)
187
188 logCategory = "vishnu"
189
191 # create a Dispatcher which will hand out avatars to clients
192 # connecting to me
193 self.dispatcher = Dispatcher(self.computeIdentity)
194
195 self.workerHeaven = self._createHeaven(interfaces.IWorkerMedium,
196 worker.WorkerHeaven)
197 self.componentHeaven = self._createHeaven(interfaces.IComponentMedium,
198 component.ComponentHeaven)
199 self.adminHeaven = self._createHeaven(interfaces.IAdminMedium,
200 admin.AdminHeaven)
201
202 self.running = True
203
204 def setStopped():
205 self.running = False
206 reactor.addSystemEventTrigger('before', 'shutdown', setStopped)
207
208 if configDir is not None:
209 self.configDir = configDir
210 else:
211 self.configDir = os.path.join(configure.configdir,
212 "managers", name)
213
214 self.bouncer = None # used by manager to authenticate worker/component
215
216 self.bundlerBasket = registry.getRegistry().makeBundlerBasket()
217
218 self._componentMappers = {} # any object -> ComponentMapper
219
220 self.state = planet.ManagerPlanetState()
221 self.state.set('name', name)
222 self.state.set('version', configure.version)
223
224 self.plugs = {} # socket -> list of plugs
225
226 # create a portal so that I can be connected to, through our dispatcher
227 # implementing the IRealm and a bouncer
228 self.portal = fportal.BouncerPortal(self.dispatcher, None)
229 #unsafeTracebacks = 1 # for debugging tracebacks to clients
230 self.factory = pb.PBServerFactory(self.portal,
231 unsafeTracebacks=unsafeTracebacks)
232 self.connectionInfo = {}
233 self.setConnectionInfo(None, None, None)
234
236 """Cancel any pending operations in preparation for shutdown.
237
238 This method is mostly useful for unit tests; currently, it is
239 not called during normal operation. Note that the caller is
240 responsible for stopping listening on the port, as the the
241 manager does not have a handle on the twisted port object.
242
243 @returns: A deferred that will fire when the manager has shut
244 down.
245 """
246 if self.bouncer:
247 return self.bouncer.stop()
248 else:
249 return defer.succeed(None)
250
254
256 """Returns the manager's configuration as a string suitable for
257 importing via loadConfiguration().
258 """
259 return config.exportPlanetXml(self.state)
260
262 """
263 Return a bundler basket to unbundle from.
264 If the registry files were updated since the last time, the
265 bundlerbasket will be rebuilt.
266
267 @since: 0.2.2
268 @rtype: L{flumotion.common.bundle.BundlerBasket}
269 """
270 if registry.getRegistry().rebuildNeeded():
271 self.info("Registry changed, rebuilding")
272 registry.getRegistry().verify(force=True)
273 self.bundlerBasket = registry.getRegistry().makeBundlerBasket()
274 elif not self.bundlerBasket.isUptodate(registry.getRegistry().mtime):
275 self.info("BundlerBasket is older than the Registry, rebuilding")
276 self.bundlerBasket = registry.getRegistry().makeBundlerBasket()
277 return self.bundlerBasket
278
280 """
281 Convenience message to construct a message and add it to the
282 planet state. `format' should be marked as translatable in the
283 source with N_, and *args will be stored as format arguments.
284 Keyword arguments are passed on to the message constructor. See
285 L{flumotion.common.messages.Message} for the meanings of the
286 rest of the arguments.
287
288 For example::
289
290 self.addMessage(messages.WARNING, 'foo-warning',
291 N_('The answer is %d'), 42, debug='not really')
292 """
293 self.addMessageObject(messages.Message(level,
294 T_(format, *args),
295 mid=mid, **kwargs))
296
298 """
299 Add a message to the planet state.
300
301 @type message: L{flumotion.common.messages.Message}
302 """
303 self.state.setitem('messages', message.id, message)
304
306 """
307 Clear any messages with the given message ID from the planet
308 state.
309
310 @type mid: message ID, normally a str
311 """
312 if mid in self.state.get('messages'):
313 self.state.delitem('messages', mid)
314
316 """
317 @param identity: L{flumotion.common.identity.Identity}
318 """
319 socket = 'flumotion.component.plugs.adminaction.AdminActionPlug'
320 if socket in self.plugs:
321 for plug in self.plugs[socket]:
322 plug.action(identity, message, args, kw)
323
325 """
326 Compute a suitable identity for a remote host. First looks to
327 see if there is a
328 L{flumotion.component.plugs.identity.IdentityProviderPlug} plug
329 installed on the manager, falling back to user@host.
330
331 The identity is only used in the adminaction interface. An
332 example of its use is when you have an adminaction plug that
333 checks an admin's privileges before actually doing an action;
334 the identity object you use here might store the privileges that
335 the admin has.
336
337 @param keycard: the keycard that the remote host used to log in.
338 @type keycard: L{flumotion.common.keycards.Keycard}
339 @param remoteHost: the ip of the remote host
340 @type remoteHost: str
341
342 @rtype: a deferred that will fire a
343 L{flumotion.common.identity.RemoteIdentity}
344 """
345
346 socket = 'flumotion.component.plugs.identity.IdentityProviderPlug'
347 if socket in self.plugs:
348 for plug in self.plugs[socket]:
349 identity = plug.computeIdentity(keycard, remoteHost)
350 if identity:
351 return identity
352 username = getattr(keycard, 'username', None)
353 return defer.succeed(RemoteIdentity(username, remoteHost))
354
356 """
357 Add a component state for the given component config entry.
358
359 @rtype: L{flumotion.common.planet.ManagerComponentState}
360 """
361
362 self.debug('adding component %s to %s'
363 % (conf.name, parent.get('name')))
364
365 if identity != LOCAL_IDENTITY:
366 self.adminAction(identity, '_addComponent', (conf, parent), {})
367
368 state = planet.ManagerComponentState()
369 state.set('name', conf.name)
370 state.set('type', conf.getType())
371 state.set('workerRequested', conf.worker)
372 state.setMood(moods.sleeping.value)
373 state.set('config', conf.getConfigDict())
374
375 state.set('parent', parent)
376 parent.append('components', state)
377
378 avatarId = conf.getConfigDict()['avatarId']
379
380 self.clearMessage('loadComponent-%s' % avatarId)
381
382 configDict = conf.getConfigDict()
383 projectName = configDict['project']
384 versionTuple = configDict['version']
385
386 projectVersion = None
387 try:
388 projectVersion = project.get(projectName, 'version')
389 except errors.NoProjectError:
390 m = messages.Warning(T_(N_(
391 "This component is configured for Flumotion project '%s', "
392 "but that project is not installed.\n"),
393 projectName))
394 state.append('messages', m)
395
396 if projectVersion:
397 self.debug('project %s, version %r, project version %r' % (
398 projectName, versionTuple, projectVersion))
399 if not common.checkVersionsCompat(
400 versionTuple,
401 common.versionStringToTuple(projectVersion)):
402 m = messages.Warning(T_(N_(
403 "This component is configured for "
404 "Flumotion '%s' version %s, "
405 "but you are running version %s.\n"
406 "Please update the configuration of the component.\n"),
407 projectName, common.versionTupleToString(versionTuple),
408 projectVersion))
409 state.append('messages', m)
410
411 # add to mapper
412 m = ComponentMapper()
413 m.state = state
414 m.id = avatarId
415 self._componentMappers[state] = m
416 self._componentMappers[avatarId] = m
417
418 return state
419
421 """
422 Add a new config object into the planet state.
423
424 @returns: a list of all components added
425 @rtype: list of L{flumotion.common.planet.ManagerComponentState}
426 """
427
428 self.debug('syncing up planet state with config')
429 added = [] # added components while parsing
430
431 def checkNotRunning(comp, parentState):
432 name = comp.getName()
433
434 comps = dict([(x.get('name'), x)
435 for x in parentState.get('components')])
436 runningComps = dict([(x.get('name'), x)
437 for x in parentState.get('components')
438 if x.get('mood') != moods.sleeping.value])
439 if name not in comps:
440 # We don't have it at all; allow it
441 return True
442 elif name not in runningComps:
443 # We have it, but it's not running. Allow it after deleting
444 # the old one.
445 oldComp = comps[name]
446 self.deleteComponent(oldComp)
447 return True
448
449 # if we get here, the component is already running; warn if
450 # the running configuration is different. Return False in
451 # all cases.
452 parent = comps[name].get('parent').get('name')
453 newConf = c.getConfigDict()
454 oldConf = comps[name].get('config')
455
456 if newConf == oldConf:
457 self.debug('%s already has component %s running with '
458 'same configuration', parent, name)
459 self.clearMessage('loadComponent-%s' % oldConf['avatarId'])
460 return False
461
462 self.info('%s already has component %s, but configuration '
463 'not the same -- notifying admin', parent, name)
464
465 diff = config.dictDiff(oldConf, newConf)
466 diffMsg = config.dictDiffMessageString(diff, 'existing', 'new')
467
468 self.addMessage(messages.WARNING,
469 'loadComponent-%s' % oldConf['avatarId'],
470 N_('Could not load component %r into %r: '
471 'a component is already running with '
472 'this name, but has a different '
473 'configuration.'), name, parent,
474 debug=diffMsg)
475 return False
476
477 state = self.state
478 atmosphere = state.get('atmosphere')
479 for c in conf.atmosphere.components.values():
480 if checkNotRunning(c, atmosphere):
481 added.append(self._addComponent(c, atmosphere, identity))
482
483 flows = dict([(x.get('name'), x) for x in state.get('flows')])
484 for f in conf.flows:
485 if f.name in flows:
486 flow = flows[f.name]
487 else:
488 self.info('creating flow %r', f.name)
489 flow = planet.ManagerFlowState(name=f.name, parent=state)
490 state.append('flows', flow)
491
492 for c in f.components.values():
493 if checkNotRunning(c, flow):
494 added.append(self._addComponent(c, flow, identity))
495
496 return added
497
499 # now start all components that need starting -- collecting into
500 # an temporary dict of the form {workerId => [components]}
501 componentsToStart = {}
502 for c in components:
503 workerId = c.get('workerRequested')
504 if not workerId in componentsToStart:
505 componentsToStart[workerId] = []
506 componentsToStart[workerId].append(c)
507 self.debug('_startComponents: componentsToStart %r' %
508 (componentsToStart, ))
509
510 for workerId, componentStates in componentsToStart.items():
511 self._workerCreateComponents(workerId, componentStates)
512
514 # makeBouncer only makes a bouncer if there is one in the config
515 d = defer.succeed(None)
516 d.addCallback(self._updateStateFromConf, conf, identity)
517 d.addCallback(self._startComponents, identity)
518 return d
519
521 """
522 Load the configuration from the given XML, merging it on top of
523 the currently running configuration.
524
525 @param file: file to parse, either as an open file object,
526 or as the name of a file to open
527 @type file: str or file
528 @param identity: The identity making this request.. This is used by the
529 adminaction logging mechanism in order to say who is
530 performing the action.
531 @type identity: L{flumotion.common.identity.Identity}
532 """
533 self.debug('loading configuration')
534 mid = 'loadComponent-parse-error'
535 if isinstance(file, str):
536 mid += '-%s' % file
537 try:
538 self.clearMessage(mid)
539 conf = config.PlanetConfigParser(file)
540 conf.parse()
541 return self._loadComponentConfiguration(conf, identity)
542 except errors.ConfigError, e:
543 self.addMessage(messages.WARNING, mid,
544 N_('Invalid component configuration.'),
545 debug=e.args[0])
546 return defer.fail(e)
547 except errors.UnknownComponentError, e:
548 if isinstance(file, str):
549 debug = 'Configuration loaded from file %r' % file
550 else:
551 debug = 'Configuration loaded remotely'
552 self.addMessage(messages.WARNING, mid,
553 N_('Unknown component in configuration: %s.'),
554 e.args[0], debug=debug)
555 return defer.fail(e)
556 except Exception, e:
557 self.addMessage(messages.WARNING, mid,
558 N_('Unknown error while loading configuration.'),
559 debug=log.getExceptionMessage(e))
560 return defer.fail(e)
561
563 # Load plugs
564 for socket, plugs in conf.plugs.items():
565 if not socket in self.plugs:
566 self.plugs[socket] = []
567
568 for args in plugs:
569 self.debug('loading plug type %s for socket %s'
570 % (args['type'], socket))
571 defs = registry.getRegistry().getPlug(args['type'])
572 e = defs.getEntry()
573 call = reflectcall.reflectCallCatching
574
575 plug = call(errors.ConfigError,
576 e.getModuleName(), e.getFunction(), args)
577 self.plugs[socket].append(plug)
578
580 for socket in self.plugs:
581 for plug in self.plugs[socket]:
582 self.debug('starting plug %r for socket %s', plug, socket)
583 plug.start(self)
584
586 if not (conf.bouncer):
587 self.warning('no bouncer defined, nothing can access the '
588 'manager')
589 return defer.succeed(None)
590
591 self.debug('going to start manager bouncer %s of type %s',
592 conf.bouncer.name, conf.bouncer.type)
593
594 defs = registry.getRegistry().getComponent(conf.bouncer.type)
595 entry = defs.getEntryByType('component')
596 # FIXME: use entry.getModuleName() (doesn't work atm?)
597 moduleName = defs.getSource()
598 methodName = entry.getFunction()
599 bouncer = reflectcall.createComponent(moduleName, methodName,
600 conf.bouncer.getConfigDict())
601 d = bouncer.waitForHappy()
602
603 def setupCallback(result):
604 bouncer.debug('started')
605 self.setBouncer(bouncer)
606
607 def setupErrback(failure):
608 self.warning('Error starting manager bouncer')
609 d.addCallbacks(setupCallback, setupErrback)
610 return d
611
613 """
614 Load manager configuration from the given XML. The manager
615 configuration is currently used to load the manager's bouncer
616 and plugs, and is only run once at startup.
617
618 @param file: file to parse, either as an open file object,
619 or as the name of a file to open
620 @type file: str or file
621 """
622 self.debug('loading configuration')
623 conf = config.ManagerConfigParser(file)
624 conf.parseBouncerAndPlugs()
625 self._loadManagerPlugs(conf)
626 self._loadManagerBouncer(conf)
627 conf.unlink()
628
629 __pychecker__ = 'maxargs=11' # hahaha
630
631 - def loadComponent(self, identity, componentType, componentId,
632 componentLabel, properties, workerName,
633 plugs, eaters, isClockMaster, virtualFeeds):
634 """
635 Load a component into the manager configuration.
636
637 See L{flumotion.manager.admin.AdminAvatar.perspective_loadComponent}
638 for a definition of the argument types.
639 """
640 self.debug('loading %s component %s on %s',
641 componentType, componentId, workerName)
642 parentName, compName = common.parseComponentId(componentId)
643
644 if isClockMaster:
645 raise NotImplementedError("Clock master components are not "
646 "yet supported")
647 if worker is None:
648 raise errors.ConfigError("Component %r needs to specify the"
649 " worker on which it should run"
650 % componentId)
651
652 state = self.state
653 compState = None
654
655 compConf = config.ConfigEntryComponent(compName, parentName,
656 componentType,
657 componentLabel,
658 properties,
659 plugs, workerName,
660 eaters, isClockMaster,
661 None, None, virtualFeeds)
662
663 if compConf.defs.getNeedsSynchronization():
664 raise NotImplementedError("Components that need "
665 "synchronization are not yet "
666 "supported")
667
668 if parentName == 'atmosphere':
669 parentState = state.get('atmosphere')
670 else:
671 flows = dict([(x.get('name'), x) for x in state.get('flows')])
672 if parentName in flows:
673 parentState = flows[parentName]
674 else:
675 self.info('creating flow %r', parentName)
676 parentState = planet.ManagerFlowState(name=parentName,
677 parent=state)
678 state.append('flows', parentState)
679
680 components = [x.get('name') for x in parentState.get('components')]
681 if compName in components:
682 self.debug('%r already has component %r', parentName, compName)
683 raise errors.ComponentAlreadyExistsError(compName)
684
685 compState = self._addComponent(compConf, parentState, identity)
686
687 self._startComponents([compState], identity)
688
689 return compState
690
692 """
693 Create a heaven of the given klass that will send avatars to clients
694 implementing the given medium interface.
695
696 @param interface: the medium interface to create a heaven for
697 @type interface: L{flumotion.common.interfaces.IMedium}
698 @param klass: the type of heaven to create
699 @type klass: an implementor of L{flumotion.common.interfaces.IHeaven}
700 """
701 assert issubclass(interface, interfaces.IMedium)
702 heaven = klass(self)
703 self.dispatcher.registerHeaven(heaven, interface)
704 return heaven
705
707 """
708 @type bouncer: L{flumotion.component.bouncers.bouncer.Bouncer}
709 """
710 if self.bouncer:
711 self.warning("manager already had a bouncer, setting anyway")
712
713 self.bouncer = bouncer
714 self.portal.bouncer = bouncer
715 self.dispatcher.setBouncer(bouncer)
716
718 return self.factory
719
721 """
722 Create the given component. This will currently also trigger
723 a start eventually when the component avatar attaches.
724
725 The component should be sleeping.
726 The worker it should be started on should be present.
727 """
728 m = componentState.get('mood')
729 if m != moods.sleeping.value:
730 raise errors.ComponentMoodError("%r not sleeping but %s" % (
731 componentState, moods.get(m).name))
732
733 p = componentState.get('moodPending')
734 if p != None:
735 raise errors.ComponentMoodError(
736 "%r already has a pending mood %s" % (
737 componentState, moods.get(p).name))
738
739 # find a worker this component can start on
740 workerId = (componentState.get('workerName')
741 or componentState.get('workerRequested'))
742
743 if not workerId in self.workerHeaven.avatars:
744 raise errors.ComponentNoWorkerError(
745 "worker %s is not logged in" % workerId)
746 else:
747 return self._workerCreateComponents(workerId, [componentState])
748
750 # NB: reset moodPending if asked to stop without an avatar
751 # because we changed above to allow stopping even if moodPending
752 # is happy
753
754 def stopSad():
755 self.debug('asked to stop a sad component without avatar')
756 for mid in componentState.get('messages')[:]:
757 self.debug("Deleting message %r", mid)
758 componentState.remove('messages', mid)
759
760 componentState.setMood(moods.sleeping.value)
761 componentState.set('moodPending', None)
762 return defer.succeed(None)
763
764 def stopLost():
765
766 def gotComponents(comps):
767 return avatarId in comps
768
769 def gotJobRunning(running):
770 if running:
771 self.warning('asked to stop lost component %r, but '
772 'it is still running', avatarId)
773 # FIXME: put a message on the state to suggest a
774 # kill?
775 msg = "Cannot stop lost component which is still running."
776 raise errors.ComponentMoodError(msg)
777 else:
778 self.debug('component %r seems to be really lost, '
779 'setting to sleeping')
780 componentState.setMood(moods.sleeping.value)
781 componentState.set('moodPending', None)
782 return None
783
784 self.debug('asked to stop a lost component without avatar')
785 workerName = componentState.get('workerRequested')
786 if workerName and self.workerHeaven.hasAvatar(workerName):
787 self.debug('checking if component has job process running')
788 d = self.workerHeaven.getAvatar(workerName).getComponents()
789 d.addCallback(gotComponents)
790 d.addCallback(gotJobRunning)
791 return d
792 else:
793 self.debug('component lacks a worker, setting to sleeping')
794 d = defer.maybeDeferred(gotJobRunning, False)
795 return d
796
797 def stopUnknown():
798 msg = ('asked to stop a component without avatar in mood %s'
799 % moods.get(mood))
800 self.warning(msg)
801 return defer.fail(errors.ComponentMoodError(msg))
802
803 mood = componentState.get('mood')
804 stoppers = {moods.sad.value: stopSad,
805 moods.lost.value: stopLost}
806 return stoppers.get(mood, stopUnknown)()
807
809 # FIXME: This deferred is just the remote call; there's no actual
810 # deferred for completion of shutdown.
811 d = componentAvatar.stop()
812
813 return d
814
816 """
817 Stop the given component.
818 If the component was sad, we clear its sad state as well,
819 since the stop was explicitly requested by the admin.
820
821 @type componentState: L{planet.ManagerComponentState}
822
823 @rtype: L{twisted.internet.defer.Deferred}
824 """
825 self.debug('componentStop(%r)', componentState)
826 # We permit stopping a component even if it has a pending mood of
827 # happy, so that if it never gets to happy, we can still stop it.
828 if (componentState.get('moodPending') != None and
829 componentState.get('moodPending') != moods.happy.value):
830 self.debug("Pending mood is %r", componentState.get('moodPending'))
831
832 raise errors.BusyComponentError(componentState)
833
834 m = self.getComponentMapper(componentState)
835 if not m:
836 # We have a stale componentState for an already-deleted
837 # component
838 self.warning("Component mapper for component state %r doesn't "
839 "exist", componentState)
840 raise errors.UnknownComponentError(componentState)
841 elif not m.avatar:
842 return self._componentStopNoAvatar(componentState, m.id)
843 else:
844 return self._componentStopWithAvatar(componentState, m.avatar)
845
847 """
848 Set the given message on the given component's state.
849 Can be called e.g. by a worker to report on a crashed component.
850 Sets the mood to sad if it is an error message.
851 """
852 if not avatarId in self._componentMappers:
853 self.warning('asked to set a message on non-mapped component %s' %
854 avatarId)
855 return
856
857 m = self._componentMappers[avatarId]
858 m.state.append('messages', message)
859 if message.level == messages.ERROR:
860 self.debug('Error message makes component sad')
861 m.state.setMood(moods.sad.value)
862
863 # FIXME: unify naming of stuff like this
864
866 # called when a worker logs in
867 workerId = workerAvatar.avatarId
868 self.debug('vishnu.workerAttached(): id %s' % workerId)
869
870 # Create all components assigned to this worker. Note that the
871 # order of creation is unimportant, it's only the order of
872 # starting that matters (and that's different code).
873 components = [c for c in self._getComponentsToCreate()
874 if c.get('workerRequested') in (workerId, None)]
875 # So now, check what components worker is running
876 # so we can remove them from this components list
877 # also add components we have that are lost but not
878 # in list given by worker
879 d = workerAvatar.getComponents()
880
881 def workerAvatarComponentListReceived(workerComponents):
882 # list() is called to work around a pychecker bug. FIXME.
883 lostComponents = list([c for c in self.getComponentStates()
884 if c.get('workerRequested') == workerId and \
885 c.get('mood') == moods.lost.value])
886 for comp in workerComponents:
887 # comp is an avatarId string
888 # components is a list of {ManagerComponentState}
889 if comp in self._componentMappers:
890 compState = self._componentMappers[comp].state
891 if compState in components:
892 components.remove(compState)
893 if compState in lostComponents:
894 lostComponents.remove(compState)
895
896 for compState in lostComponents:
897 self.info(
898 "Restarting previously lost component %s on worker %s",
899 self._componentMappers[compState].id, workerId)
900 # We set mood to sleeping first. This allows things to
901 # distinguish between a newly-started component and a lost
902 # component logging back in.
903 compState.set('moodPending', None)
904 compState.setMood(moods.sleeping.value)
905
906 allComponents = components + lostComponents
907
908 if not allComponents:
909 self.debug(
910 "vishnu.workerAttached(): no components for this worker")
911 return
912
913 self._workerCreateComponents(workerId, allComponents)
914 d.addCallback(workerAvatarComponentListReceived)
915
916 reactor.callLater(0, self.componentHeaven.feedServerAvailable,
917 workerId)
918
920 """
921 Create the list of components on the given worker, sequentially, but
922 in no specific order.
923
924 @param workerId: avatarId of the worker
925 @type workerId: string
926 @param components: components to start
927 @type components: list of
928 L{flumotion.common.planet.ManagerComponentState}
929 """
930 self.debug("_workerCreateComponents: workerId %r, components %r" % (
931 workerId, components))
932
933 if not workerId in self.workerHeaven.avatars:
934 self.debug('worker %s not logged in yet, delaying '
935 'component start' % workerId)
936 return defer.succeed(None)
937
938 workerAvatar = self.workerHeaven.avatars[workerId]
939
940 d = defer.Deferred()
941
942 for c in components:
943 componentType = c.get('type')
944 conf = c.get('config')
945 self.debug('scheduling create of %s on %s'
946 % (conf['avatarId'], workerId))
947 d.addCallback(self._workerCreateComponentDelayed,
948 workerAvatar, c, componentType, conf)
949
950 d.addCallback(lambda result: self.debug(
951 '_workerCreateComponents(): completed setting up create chain'))
952
953 # now trigger the chain
954 self.debug('_workerCreateComponents(): triggering create chain')
955 d.callback(None)
956 #reactor.callLater(0, d.callback, None)
957 return d
958
959 - def _workerCreateComponentDelayed(self, result, workerAvatar,
960 componentState, componentType, conf):
961
962 avatarId = conf['avatarId']
963 nice = conf.get('nice', 0)
964
965 # we set the moodPending to HAPPY, so this component only gets
966 # asked to start once
967 componentState.set('moodPending', moods.happy.value)
968
969 d = workerAvatar.createComponent(avatarId, componentType, nice,
970 conf)
971 # FIXME: here we get the avatar Id of the component we wanted
972 # started, so now attach it to the planetState's component state
973 d.addCallback(self._createCallback, componentState)
974 d.addErrback(self._createErrback, componentState)
975
976 # FIXME: shouldn't we return d here to make sure components
977 # wait on each other to be started ?
978
980 self.debug('got avatarId %s for state %s' % (result, componentState))
981 m = self._componentMappers[componentState]
982 assert result == m.id, "received id %s is not the expected id %s" % (
983 result, m.id)
984
986 # FIXME: make ConfigError copyable so we can .check() it here
987 # and print a nicer warning
988 self.warning('failed to create component %s: %s',
989 state.get('name'), log.getFailureMessage(failure))
990
991 if failure.check(errors.ComponentAlreadyRunningError):
992 if self._componentMappers[state].jobState:
993 self.info('component appears to have logged in in the '
994 'meantime')
995 else:
996 self.info('component appears to be running already; '
997 'treating it as lost until it logs in')
998 state.setMood(moods.lost.value)
999 else:
1000 message = messages.Error(T_(
1001 N_("The component could not be started.")),
1002 debug=log.getFailureMessage(failure))
1003
1004 state.setMood(moods.sad.value)
1005 state.append('messages', message)
1006
1007 return None
1008
1010 # called when a worker logs out
1011 workerId = workerAvatar.avatarId
1012 self.debug('vishnu.workerDetached(): id %s' % workerId)
1013 # Get all sad components for the detached worker and set the mood to
1014 # sleeping
1015 sadComponents = list([c for c in self.getComponentStates()
1016 if c.get('workerRequested') == workerId and \
1017 c.get('mood') == moods.sad.value])
1018 map(lambda c: c.setMood(moods.sleeping.value), sadComponents)
1019
1021 # check if we have this flow yet and add if not
1022 if flowName == 'atmosphere':
1023 # treat the atmosphere like a flow, although it's not
1024 flow = self.state.get('atmosphere')
1025 else:
1026 flow = self._getFlowByName(flowName)
1027 if not flow:
1028 self.info('Creating flow "%s"' % flowName)
1029 flow = planet.ManagerFlowState()
1030 flow.set('name', flowName)
1031 flow.set('parent', self.state)
1032 self.state.append('flows', flow)
1033
1034 componentState.set('parent', flow)
1035 flow.append('components', componentState)
1036
1038 # fetch or create a new mapper
1039 m = (self.getComponentMapper(componentAvatar.avatarId)
1040 or ComponentMapper())
1041
1042 m.state = componentAvatar.componentState
1043 m.jobState = componentAvatar.jobState
1044 m.id = componentAvatar.avatarId
1045 m.avatar = componentAvatar
1046
1047 self._componentMappers[m.state] = m
1048 self._componentMappers[m.jobState] = m
1049 self._componentMappers[m.id] = m
1050 self._componentMappers[m.avatar] = m
1051
1053 # called when the component is logging out
1054 # clear up jobState and avatar
1055 self.debug('unregisterComponent(%r): cleaning up state' %
1056 componentAvatar)
1057
1058 m = self._componentMappers[componentAvatar]
1059
1060 # unmap jobstate
1061 try:
1062 del self._componentMappers[m.jobState]
1063 except KeyError:
1064 self.warning('Could not remove jobState for %r' % componentAvatar)
1065 m.jobState = None
1066
1067 m.state.set('pid', None)
1068 m.state.set('workerName', None)
1069 m.state.set('moodPending', None)
1070
1071 # unmap avatar
1072 del self._componentMappers[m.avatar]
1073 m.avatar = None
1074
1076 cList = self.state.getComponents()
1077 self.debug('getComponentStates(): %d components' % len(cList))
1078 for c in cList:
1079 self.log(repr(c))
1080 mood = c.get('mood')
1081 if mood == None:
1082 self.warning('%s has mood None' % c.get('name'))
1083
1084 return cList
1085
1087 """
1088 Empty the planet of the given component.
1089
1090 @returns: a deferred that will fire when all listeners have been
1091 notified of the removal of the component.
1092 """
1093 self.debug('deleting component %r from state', componentState)
1094 c = componentState
1095 if c not in self._componentMappers:
1096 raise errors.UnknownComponentError(c)
1097
1098 flow = componentState.get('parent')
1099 if (c.get('moodPending') != None
1100 or c.get('mood') is not moods.sleeping.value):
1101 raise errors.BusyComponentError(c)
1102
1103 del self._componentMappers[self._componentMappers[c].id]
1104 del self._componentMappers[c]
1105 return flow.remove('components', c)
1106
1111
1113 """
1114 Empty the planet of a flow.
1115
1116 @returns: a deferred that will fire when the flow is removed.
1117 """
1118
1119 flow = self._getFlowByName(flowName)
1120 if flow is None:
1121 raise ValueError("No flow called %s found" % (flowName, ))
1122
1123 components = flow.get('components')
1124 for c in components:
1125 # if any component is already in a mood change/command, fail
1126 if (c.get('moodPending') != None or
1127 c.get('mood') is not moods.sleeping.value):
1128 raise errors.BusyComponentError(c)
1129 for c in components:
1130 del self._componentMappers[self._componentMappers[c].id]
1131 del self._componentMappers[c]
1132 d = flow.empty()
1133 d.addCallback(lambda _: self.state.remove('flows', flow))
1134 return d
1135
1137 """
1138 Empty the planet of all components, and flows. Also clears all
1139 messages.
1140
1141 @returns: a deferred that will fire when the planet is empty.
1142 """
1143 for mid in self.state.get('messages').keys():
1144 self.clearMessage(mid)
1145
1146 # first get all components to sleep
1147 components = self.getComponentStates()
1148
1149 # if any component is already in a mood change/command, fail
1150 components = [c for c in components
1151 if c.get('moodPending') != None]
1152 if components:
1153 state = components[0]
1154 raise errors.BusyComponentError(
1155 state,
1156 "moodPending is %s" % moods.get(state.get('moodPending')))
1157
1158 # filter out the ones that aren't sleeping and stop them
1159 components = [c for c in self.getComponentStates()
1160 if c.get('mood') is not moods.sleeping.value]
1161
1162 # create a big deferred for stopping everything
1163 d = defer.Deferred()
1164
1165 self.debug('need to stop %d components: %r' % (
1166 len(components), components))
1167
1168 for c in components:
1169 avatar = self._componentMappers[c].avatar
1170 # If this has logged out, but isn't sleeping (so is sad or lost),
1171 # we won't have an avatar. So, stop if it we can.
1172 if avatar:
1173 d.addCallback(lambda result, a: a.stop(), avatar)
1174 else:
1175 assert (c.get('mood') is moods.sad.value or
1176 c.get('mood') is moods.lost.value)
1177
1178 d.addCallback(self._emptyPlanetCallback)
1179
1180 # trigger the deferred after returning
1181 reactor.callLater(0, d.callback, None)
1182
1183 return d
1184
1186 # gets called after all components have stopped
1187 # cleans up the rest of the planet state
1188 components = self.getComponentStates()
1189 self.debug('_emptyPlanetCallback: need to delete %d components' %
1190 len(components))
1191
1192 for c in components:
1193 if c.get('mood') is not moods.sleeping.value:
1194 self.warning('Component %s is not sleeping', c.get('name'))
1195 # clear mapper; remove componentstate and id
1196 m = self._componentMappers[c]
1197 del self._componentMappers[m.id]
1198 del self._componentMappers[c]
1199
1200 # if anything's left, we have a mistake somewhere
1201 l = self._componentMappers.keys()
1202 if len(l) > 0:
1203 self.warning('mappers still has keys %r' % (repr(l)))
1204
1205 dList = []
1206
1207 dList.append(self.state.get('atmosphere').empty())
1208
1209 for f in self.state.get('flows'):
1210 self.debug('appending deferred for emptying flow %r' % f)
1211 dList.append(f.empty())
1212 self.debug('appending deferred for removing flow %r' % f)
1213 dList.append(self.state.remove('flows', f))
1214 self.debug('appended deferreds')
1215
1216 dl = defer.DeferredList(dList)
1217 return dl
1218
1220 """
1221 @rtype: list of L{flumotion.common.planet.ManagerComponentState}
1222 """
1223 # return a list of components that are sleeping
1224 components = self.state.getComponents()
1225
1226 # filter the ones that are sleeping
1227 # NOTE: now sleeping indicates that there is no existing job
1228 # as when jobs are created, mood becomes waking, so no need to
1229 # filter on moodPending
1230 isSleeping = lambda c: c.get('mood') == moods.sleeping.value
1231 components = filter(isSleeping, components)
1232 return components
1233
1235 # returns the WorkerAvatar with the given name
1236 if not workerName in self.workerHeaven.avatars:
1237 raise errors.ComponentNoWorkerError("Worker %s not logged in?"
1238 % workerName)
1239
1240 return self.workerHeaven.avatars[workerName]
1241
1243 if workerName in self.workerHeaven.avatars:
1244 return self._getWorker(workerName).feedServerPort
1245 return None
1246
1248 """
1249 Requests a number of ports on the worker named workerName. The
1250 ports will be reserved for the use of the caller until
1251 releasePortsOnWorker is called.
1252
1253 @returns: a list of ports as integers
1254 """
1255 return self._getWorker(workerName).reservePorts(numPorts)
1256
1258 """
1259 Tells the manager that the given ports are no longer being used,
1260 and may be returned to the allocation pool.
1261 """
1262 try:
1263 return self._getWorker(workerName).releasePorts(ports)
1264 except errors.ComponentNoWorkerError, e:
1265 self.warning('could not release ports: %r' % e.args)
1266
1268 """
1269 Look up an object mapper given the object.
1270
1271 @rtype: L{ComponentMapper} or None
1272 """
1273 if object in self._componentMappers.keys():
1274 return self._componentMappers[object]
1275
1276 return None
1277
1279 """
1280 Look up an object mapper given the object.
1281
1282 @rtype: L{ComponentMapper} or None
1283 """
1284 if object in self._componentMappers.keys():
1285 return self._componentMappers[object].state
1286
1287 return None
1288
1290 """
1291 Invokes method on all components of a certain type
1292 """
1293
1294 def invokeOnOneComponent(component, methodName, *args, **kwargs):
1295 m = self.getComponentMapper(component)
1296 if not m:
1297 self.warning('Component %s not mapped. Maybe deleted.',
1298 component.get('name'))
1299 raise errors.UnknownComponentError(component)
1300
1301 avatar = m.avatar
1302 if not avatar:
1303 self.warning('No avatar for %s, cannot call remote',
1304 component.get('name'))
1305 raise errors.SleepingComponentError(component)
1306
1307 try:
1308 return avatar.mindCallRemote(methodName, *args, **kwargs)
1309 except Exception, e:
1310 log_message = log.getExceptionMessage(e)
1311 msg = "exception on remote call %s: %s" % (methodName,
1312 log_message)
1313 self.warning(msg)
1314 raise errors.RemoteMethodError(methodName,
1315 log_message)
1316
1317 # only do this on happy or hungry components of type componentType
1318 dl_array = []
1319 for c in self.getComponentStates():
1320 if c.get('type') == componentType and \
1321 (c.get('mood') is moods.happy.value or
1322 c.get('mood') is moods.hungry.value):
1323 self.info("component %r to have %s run", c, methodName)
1324 d = invokeOnOneComponent(c, methodName, *args, **kwargs)
1325 dl_array.append(d)
1326 dl = defer.DeferredList(dl_array)
1327 return dl
1328
| Trees | Indices | Help |
|---|
| Generated by Epydoc 3.0.1 on Mon May 11 00:19:49 2015 | http://epydoc.sourceforge.net |