Package flumotion :: Package component :: Module feedcomponent010
[hide private]

Source Code for Module flumotion.component.feedcomponent010

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_feedcomponent010 -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import gst 
 23  import gobject 
 24   
 25  import os 
 26  import time 
 27   
 28  from twisted.internet import reactor, defer 
 29   
 30  from flumotion.common import common, errors, pygobject, messages, log 
 31  from flumotion.common import gstreamer 
 32  from flumotion.common.i18n import N_, gettexter 
 33  from flumotion.common.planet import moods 
 34  from flumotion.component import component as basecomponent 
 35  from flumotion.component import feed, padmonitor 
 36  from flumotion.component.feeder import Feeder 
 37  from flumotion.component.eater import Eater 
 38   
 39  __version__ = "$Rev: 8800 $" 
 40  T_ = gettexter() 
 41   
 42   
43 -class FeedComponent(basecomponent.BaseComponent):
44 """ 45 I am a base class for all Flumotion feed components. 46 """ 47 48 # how often to update the UIState feeder statistics 49 FEEDER_STATS_UPDATE_FREQUENCY = 12.5 50 keepStreamheaderForLater = False 51 dropStreamHeaders = True 52 swallowNewSegment = True 53 54 logCategory = 'feedcomponent' 55 56 ### BaseComponent interface implementations 57
58 - def init(self):
59 # add keys for eaters and feeders uiState 60 self.feeders = {} # feeder feedName -> Feeder 61 self.eaters = {} # eater eaterAlias -> Eater 62 self.uiState.addListKey('feeders') 63 self.uiState.addListKey('eaters') 64 65 self.pipeline = None 66 self.pipeline_signals = [] 67 self.bus_signal_id = None 68 self.effects = {} 69 self._feeder_probe_cl = None 70 71 self._pad_monitors = padmonitor.PadMonitorSet( 72 lambda: self.setMood(moods.happy), 73 lambda: self.setMood(moods.hungry)) 74 75 self._clock_slaved = False 76 self.clock_provider = None 77 self._master_clock_info = None # (ip, port, basetime) if we're the 78 # clock master 79 80 self._change_monitor = gstreamer.StateChangeMonitor() 81 82 # multifdsink's get-stats signal had critical bugs before this version 83 self._get_stats_supported = (gstreamer.get_plugin_version('tcp') 84 >= (0, 10, 11, 0))
85
86 - def do_setup(self):
87 """ 88 Sets up component. 89 90 Invokes the L{create_pipeline} and L{set_pipeline} vmethods, 91 which subclasses can provide. 92 """ 93 config = self.config 94 eater_config = config.get('eater', {}) 95 feeder_config = config.get('feed', []) 96 source_config = config.get('source', []) 97 98 self.debug("FeedComponent.do_setup(): eater_config %r", eater_config) 99 self.debug("FeedComponent.do_setup(): feeder_config %r", feeder_config) 100 self.debug("FeedComponent.do_setup(): source_config %r", source_config) 101 # for upgrade of code without restarting managers 102 # this will only be for components whose eater name in registry is 103 # default, so no need to import registry and find eater name 104 if eater_config == {} and source_config != []: 105 eater_config = {'default': [(x, 'default') for x in source_config]} 106 107 for eaterName in eater_config: 108 for feedId, eaterAlias in eater_config[eaterName]: 109 self.eaters[eaterAlias] = Eater(eaterAlias, eaterName) 110 self.uiState.append('eaters', self.eaters[eaterAlias].uiState) 111 112 for feederName in feeder_config: 113 self.feeders[feederName] = Feeder(feederName) 114 self.uiState.append('feeders', 115 self.feeders[feederName].uiState) 116 117 clockMaster = config.get('clock-master', None) 118 if clockMaster: 119 self._clock_slaved = clockMaster != config['avatarId'] 120 else: 121 self._clock_slaved = False 122 123 pipeline = self.create_pipeline() 124 self.connect_feeders(pipeline) 125 self.set_pipeline(pipeline) 126 127 self.debug("FeedComponent.do_setup(): setup finished") 128 129 self.try_start_pipeline() 130 131 # no race, messages marshalled asynchronously via the bus 132 d = self._change_monitor.add(gst.STATE_CHANGE_PAUSED_TO_PLAYING) 133 d.addCallback(lambda x: self.do_pipeline_playing())
134
135 - def setup_completed(self):
136 # Just log; we override the superclass to not turn happy here. 137 # Instead, we turn happy once the pipeline gets to PLAYING. 138 self.debug("Setup completed")
139 140 ### FeedComponent interface for subclasses 141
142 - def create_pipeline(self):
143 """ 144 Subclasses have to implement this method. 145 146 @rtype: L{gst.Pipeline} 147 """ 148 raise NotImplementedError( 149 "subclass must implement create_pipeline")
150
151 - def set_pipeline(self, pipeline):
152 """ 153 Subclasses can override me. 154 They should chain up first. 155 """ 156 if self.pipeline: 157 self.cleanup() 158 self.pipeline = pipeline 159 self._setup_pipeline()
160
161 - def attachPadMonitorToFeeder(self, feederName):
162 elementName = self.feeders[feederName].payName 163 element = self.pipeline.get_by_name(elementName) 164 if not element: 165 raise errors.ComponentError("No such feeder %s" % feederName) 166 167 pad = element.get_pad('src') 168 self._pad_monitors.attach(pad, elementName)
169 170 ### FeedComponent methods 171
172 - def addEffect(self, effect):
173 self.effects[effect.name] = effect 174 effect.setComponent(self)
175
176 - def connect_feeders(self, pipeline):
177 # Connect to the client-fd-removed signals on each feeder, so we 178 # can clean up properly on removal. 179 180 def client_fd_removed(sink, fd, feeder): 181 # Called (as a signal callback) when the FD is no longer in 182 # use by multifdsink. 183 # This will call the registered callable on the fd. 184 # Called from GStreamer threads. 185 self.debug("cleaning up fd %d", fd) 186 feeder.clientDisconnected(fd)
187 188 for feeder in self.feeders.values(): 189 element = pipeline.get_by_name(feeder.elementName) 190 if element: 191 element.connect('client-fd-removed', client_fd_removed, 192 feeder) 193 self.debug("Connected to client-fd-removed on %r", feeder) 194 else: 195 self.warning("No feeder %s in pipeline", feeder.elementName)
196
197 - def get_pipeline(self):
198 return self.pipeline
199
200 - def do_pipeline_playing(self):
201 """ 202 Invoked when the pipeline has changed the state to playing. 203 The default implementation sets the component's mood to HAPPY. 204 """ 205 self.setMood(moods.happy)
206
207 - def make_message_for_gstreamer_error(self, gerror, debug):
208 """Make a flumotion error message to show to the user. 209 210 This method may be overridden by components that have special 211 knowledge about potential errors. If the component does not know 212 about the error, it can chain up to this implementation, which 213 will make a generic message. 214 215 @param gerror: The GError from the error message posted on the 216 GStreamer message bus. 217 @type gerror: L{gst.GError} 218 @param debug: A string with debugging information. 219 @type debug: str 220 221 @returns: A L{flumotion.common.messages.Message} to show to the 222 user. 223 """ 224 # generate a unique id 225 mid = "%s-%s-%d" % (self.name, gerror.domain, gerror.code) 226 m = messages.Error(T_(N_( 227 "Internal GStreamer error.")), 228 debug="%s\n%s: %d\n%s" % ( 229 gerror.message, gerror.domain, gerror.code, debug), 230 mid=mid, priority=40) 231 return m
232
233 - def bus_message_received_cb(self, bus, message):
234 235 def state_changed(): 236 if src == self.pipeline: 237 old, new, pending = message.parse_state_changed() 238 self._change_monitor.state_changed(old, new) 239 dump_filename = "%s.%s_%s" % (self.name, 240 gst.element_state_get_name(old), 241 gst.element_state_get_name(new)) 242 self.dump_gstreamer_debug_dot_file(dump_filename, True)
243 244 def error(): 245 gerror, debug = message.parse_error() 246 self.warning('element %s error %s %s', 247 src.get_path_string(), gerror, debug) 248 self.setMood(moods.sad) 249 250 # this method can fail if the component has a mistake 251 try: 252 m = self.make_message_for_gstreamer_error(gerror, debug) 253 except Exception, e: 254 msg = log.getExceptionMessage(e) 255 m = messages.Error(T_(N_( 256 "Programming error in component.")), 257 debug="Bug in %r.make_message_for_gstreamer_error: %s" % ( 258 self.__class__, msg)) 259 260 self.state.append('messages', m) 261 self._change_monitor.have_error(self.pipeline.get_state(), 262 message) 263 264 def eos(): 265 name = src.get_name() 266 if name in self._pad_monitors: 267 self.info('End of stream in element %s', name) 268 self._pad_monitors[name].setInactive() 269 else: 270 self.info("We got an eos from %s", name) 271 272 def default(): 273 self.log('message received: %r', message) 274 275 handlers = {gst.MESSAGE_STATE_CHANGED: state_changed, 276 gst.MESSAGE_ERROR: error, 277 gst.MESSAGE_EOS: eos} 278 t = message.type 279 src = message.src 280 handlers.get(t, default)() 281 return True 282
283 - def install_eater_continuity_watch(self, eaterWatchElements):
284 """Watch a set of elements for discontinuity messages. 285 286 @param eaterWatchElements: the set of elements to watch for 287 discontinuities. 288 @type eaterWatchElements: Dict of elementName => Eater. 289 """ 290 291 def on_element_message(bus, message): 292 src = message.src 293 name = src.get_name() 294 if name in eaterWatchElements: 295 eater = eaterWatchElements[name] 296 s = message.structure 297 298 def timestampDiscont(): 299 prevTs = s["prev-timestamp"] 300 prevDuration = s["prev-duration"] 301 curTs = s["cur-timestamp"] 302 discont = curTs - (prevTs + prevDuration) 303 dSeconds = discont / float(gst.SECOND) 304 self.debug("we have a discont on eater %s of %f s " 305 "between %s and %s ", eater.eaterAlias, 306 dSeconds, gst.TIME_ARGS(prevTs + prevDuration), 307 gst.TIME_ARGS(curTs)) 308 eater.timestampDiscont(dSeconds, 309 float(curTs) / float(gst.SECOND))
310 311 def offsetDiscont(): 312 prevOffsetEnd = s["prev-offset-end"] 313 curOffset = s["cur-offset"] 314 discont = curOffset - prevOffsetEnd 315 self.debug("we have a discont on eater %s of %d " 316 "units between %d and %d ", 317 eater.eaterAlias, discont, prevOffsetEnd, 318 curOffset) 319 eater.offsetDiscont(discont, curOffset) 320 321 handlers = {'imperfect-timestamp': timestampDiscont, 322 'imperfect-offset': offsetDiscont} 323 if s.get_name() in handlers: 324 handlers[s.get_name()]() 325 326 # we know that there is a signal watch already installed 327 bus = self.pipeline.get_bus() 328 # never gets cleaned up; does that matter? 329 bus.connect("message::element", on_element_message) 330
331 - def install_eater_event_probes(self, eater):
332 333 def fdsrc_event(pad, event): 334 # An event probe used to consume unwanted EOS events on eaters. 335 # Called from GStreamer threads. 336 if event.type == gst.EVENT_EOS: 337 self.info('End of stream for eater %s, disconnect will be ' 338 'triggered', eater.eaterAlias) 339 # We swallow it because otherwise our component acts on the EOS 340 # and we can't recover from that later. Instead, fdsrc will be 341 # taken out and given a new fd on the next eatFromFD call. 342 return False 343 return True
344 345 def depay_event(pad, event): 346 # An event probe used to consume unwanted duplicate 347 # newsegment events. 348 # Called from GStreamer threads. 349 if event.type == gst.EVENT_NEWSEGMENT: 350 # We do this because we know gdppay/gdpdepay screw up on 2nd 351 # newsegments (unclear what the original reason for this 352 # was, perhaps #349204) 353 # Other elements might also have problems with repeated 354 # newsegments coming in, so we just drop them all. Flumotion 355 # operates in single segment space, so dropping newsegments 356 # should be fine. 357 if getattr(eater, '_gotFirstNewSegment', False): 358 self.info("Subsequent new segment event received on " 359 "depay on eater %s", eater.eaterAlias) 360 # swallow (gulp) 361 eater.streamheader = [] 362 if self.swallowNewSegment: 363 return False 364 else: 365 eater._gotFirstNewSegment = True 366 return True 367 368 self.debug('adding event probe for eater %s', eater.eaterAlias) 369 fdsrc = self.get_element(eater.elementName) 370 fdsrc.get_pad("src").add_event_probe(fdsrc_event) 371 depay = self.get_element(eater.depayName) 372 depay.get_pad("src").add_event_probe(depay_event) 373
374 - def _setup_pipeline(self):
375 self.debug('setup_pipeline()') 376 assert self.bus_signal_id == None 377 378 self.pipeline.set_name('pipeline-' + self.getName()) 379 bus = self.pipeline.get_bus() 380 bus.add_signal_watch() 381 self.bus_signal_id = bus.connect('message', 382 self.bus_message_received_cb) 383 sig_id = self.pipeline.connect('deep-notify', 384 gstreamer.verbose_deep_notify_cb, self) 385 self.pipeline_signals.append(sig_id) 386 387 # set to ready so that multifdsinks can always receive fds, even 388 # if the pipeline has a delayed start due to clock slaving 389 self.pipeline.set_state(gst.STATE_READY) 390 391 # start checking feeders, if we have a sufficiently recent multifdsink 392 if self._get_stats_supported: 393 self._feeder_probe_cl = reactor.callLater( 394 self.FEEDER_STATS_UPDATE_FREQUENCY, 395 self._feeder_probe_calllater) 396 else: 397 self.warning("Feeder statistics unavailable, your " 398 "gst-plugins-base is too old") 399 m = messages.Warning(T_(N_( 400 "Your gst-plugins-base is too old, so " 401 "feeder statistics will be unavailable.")), 402 mid='multifdsink') 403 m.add(T_(N_( 404 "Please upgrade '%s' to version %s."), 'gst-plugins-base', 405 '0.10.11')) 406 self.addMessage(m) 407 408 for eater in self.eaters.values(): 409 self.install_eater_event_probes(eater) 410 pad = self.get_element(eater.elementName).get_pad('src') 411 self._pad_monitors.attach(pad, eater.elementName, 412 padmonitor.EaterPadMonitor, 413 self.reconnectEater, 414 eater.eaterAlias) 415 eater.setPadMonitor(self._pad_monitors[eater.elementName])
416
417 - def stop_pipeline(self):
418 if not self.pipeline: 419 return 420 421 if self.clock_provider: 422 self.clock_provider.set_property('active', False) 423 self.clock_provider = None 424 retval = self.pipeline.set_state(gst.STATE_NULL) 425 if retval != gst.STATE_CHANGE_SUCCESS: 426 self.warning('Setting pipeline to NULL failed')
427
428 - def cleanup(self):
429 self.debug("cleaning up") 430 431 assert self.pipeline != None 432 433 self.stop_pipeline() 434 # Disconnect signals 435 map(self.pipeline.disconnect, self.pipeline_signals) 436 self.pipeline_signals = [] 437 if self.bus_signal_id: 438 self.pipeline.get_bus().disconnect(self.bus_signal_id) 439 self.pipeline.get_bus().remove_signal_watch() 440 self.bus_signal_id = None 441 self.pipeline = None 442 443 if self._feeder_probe_cl: 444 self._feeder_probe_cl.cancel() 445 self._feeder_probe_cl = None 446 447 # clean up checkEater callLaters 448 for eater in self.eaters.values(): 449 self._pad_monitors.remove(eater.elementName) 450 eater.setPadMonitor(None)
451
452 - def do_stop(self):
453 self.debug('Stopping') 454 if self.pipeline: 455 self.cleanup() 456 self.debug('Stopped') 457 return defer.succeed(None)
458
459 - def set_master_clock(self, ip, port, base_time):
460 self.debug("Master clock set to %s:%d with base_time %s", ip, port, 461 gst.TIME_ARGS(base_time)) 462 463 assert self._clock_slaved 464 if self._master_clock_info == (ip, port, base_time): 465 self.debug("Same master clock info, returning directly") 466 return defer.succeed(None) 467 elif self._master_clock_info: 468 self.stop_pipeline() 469 470 self._master_clock_info = ip, port, base_time 471 472 clock = gst.NetClientClock(None, ip, port, base_time) 473 # disable the pipeline's management of base_time -- we're going 474 # to set it ourselves. 475 self.pipeline.set_new_stream_time(gst.CLOCK_TIME_NONE) 476 self.pipeline.set_base_time(base_time) 477 self.pipeline.use_clock(clock) 478 479 self.try_start_pipeline()
480
481 - def get_master_clock(self):
482 """ 483 Return the connection details for the network clock provided by 484 this component, if any. 485 """ 486 if self.clock_provider: 487 ip, port, base_time = self._master_clock_info 488 return ip, port, base_time 489 else: 490 return None
491
492 - def provide_master_clock(self, port):
493 """ 494 Tell the component to provide a master clock on the given port. 495 496 @returns: a deferred firing a (ip, port, base_time) triple. 497 """ 498 499 def pipelinePaused(r): 500 clock = self.pipeline.get_clock() 501 # make sure the pipeline sticks with this clock 502 self.pipeline.use_clock(clock) 503 504 self.clock_provider = gst.NetTimeProvider(clock, None, port) 505 realport = self.clock_provider.get_property('port') 506 507 base_time = self.pipeline.get_base_time() 508 509 self.debug('provided master clock from %r, base time %s', 510 clock, gst.TIME_ARGS(base_time)) 511 512 if self.medium: 513 # FIXME: This isn't always correct. We need a more 514 # flexible API, and a proper network map, to do this. 515 # Even then, it's not always going to be possible. 516 ip = self.medium.getIP() 517 else: 518 ip = "127.0.0.1" 519 520 self._master_clock_info = (ip, realport, base_time) 521 return self.get_master_clock()
522 523 assert self.pipeline 524 assert not self._clock_slaved 525 (ret, state, pending) = self.pipeline.get_state(0) 526 if state != gst.STATE_PAUSED and state != gst.STATE_PLAYING: 527 self.debug("pipeline still spinning up: %r", state) 528 d = self._change_monitor.add(gst.STATE_CHANGE_READY_TO_PAUSED) 529 d.addCallback(pipelinePaused) 530 return d 531 elif self.clock_provider: 532 self.debug("returning existing master clock info") 533 return defer.succeed(self.get_master_clock()) 534 else: 535 return defer.maybeDeferred(pipelinePaused, None) 536
537 - def dump_gstreamer_debug_dot_file(self, filename, with_timestamp=False):
538 """ 539 Dumps a graphviz dot file of the pipeline's current state to disk. 540 This will only actually do anything if the environment variable 541 GST_DEBUG_DUMP_DOT_DIR is set. 542 543 @param filename: filename to store 544 @param with_timestamp: if True, then timestamp will be prepended to 545 filename 546 """ 547 if hasattr(gst, "DEBUG_BIN_TO_DOT_FILE"): 548 method = gst.DEBUG_BIN_TO_DOT_FILE 549 if with_timestamp: 550 method = gst.DEBUG_BIN_TO_DOT_FILE_WITH_TS 551 method(self.pipeline, gst.DEBUG_GRAPH_SHOW_ALL, filename)
552 553 ### BaseComponent interface implementation 554
555 - def try_start_pipeline(self, force=False):
556 """ 557 Tell the component to start. 558 Whatever is using the component is responsible for making sure all 559 eaters have received their file descriptor to eat from. 560 """ 561 (ret, state, pending) = self.pipeline.get_state(0) 562 if state == gst.STATE_PLAYING: 563 self.log('already PLAYING') 564 if not force: 565 return 566 self.debug('pipeline PLAYING, but starting anyway as requested') 567 568 if self._clock_slaved and not self._master_clock_info: 569 self.debug("Missing master clock info, deferring set to PLAYING") 570 return 571 572 for eater in self.eaters.values(): 573 if not eater.fd: 574 self.debug('eater %s not yet connected, deferring set to ' 575 'PLAYING', eater.eaterAlias) 576 return 577 578 self.debug("Setting pipeline %r to GST_STATE_PLAYING", self.pipeline) 579 self.pipeline.set_state(gst.STATE_PLAYING)
580
581 - def _feeder_probe_calllater(self):
582 for feedId, feeder in self.feeders.items(): 583 feederElement = self.get_element(feeder.elementName) 584 for client in feeder.getClients(): 585 # a currently disconnected client will have fd None 586 if client.fd is not None: 587 array = feederElement.emit('get-stats', client.fd) 588 if len(array) == 0: 589 # There is an unavoidable race here: we can't know 590 # whether the fd has been removed from multifdsink. 591 # However, if we call get-stats on an fd that 592 # multifdsink doesn't know about, we just get a 593 # 0-length array. We ensure that we don't reuse 594 # the FD too soon so this can't result in calling 595 # this on a valid but WRONG fd 596 self.debug('Feeder element for feed %s does not know ' 597 'client fd %d' % (feedId, client.fd)) 598 else: 599 client.setStats(array) 600 self._feeder_probe_cl = reactor.callLater( 601 self.FEEDER_STATS_UPDATE_FREQUENCY, 602 self._feeder_probe_calllater)
603
604 - def unblock_eater(self, eaterAlias):
605 """ 606 After this function returns, the stream lock for this eater must have 607 been released. If your component needs to do something here, override 608 this method. 609 """ 610 pass
611
612 - def get_element(self, element_name):
613 """Get an element out of the pipeline. 614 615 If it is possible that the component has not yet been set up, 616 the caller needs to check if self.pipeline is actually set. 617 """ 618 assert self.pipeline 619 self.log('Looking up element %r in pipeline %r', 620 element_name, self.pipeline) 621 element = self.pipeline.get_by_name(element_name) 622 if not element: 623 self.warning("No element named %r in pipeline", element_name) 624 return element
625
626 - def get_element_property(self, element_name, property):
627 'Gets a property of an element in the GStreamer pipeline.' 628 self.debug("%s: getting property %s of element %s" % ( 629 self.getName(), property, element_name)) 630 element = self.get_element(element_name) 631 if not element: 632 msg = "Element '%s' does not exist" % element_name 633 self.warning(msg) 634 raise errors.PropertyError(msg) 635 636 self.debug('getting property %s on element %s' % ( 637 property, element_name)) 638 try: 639 value = element.get_property(property) 640 except (ValueError, TypeError): 641 msg = "Property '%s' on element '%s' does not exist" % ( 642 property, element_name) 643 self.warning(msg) 644 raise errors.PropertyError(msg) 645 646 # param enums and enums need to be returned by integer value 647 if isinstance(value, gobject.GEnum): 648 value = int(value) 649 650 return value
651
652 - def set_element_property(self, element_name, property, value):
653 'Sets a property on an element in the GStreamer pipeline.' 654 self.debug("%s: setting property %s of element %s to %s" % ( 655 self.getName(), property, element_name, value)) 656 element = self.get_element(element_name) 657 if not element: 658 msg = "Element '%s' does not exist" % element_name 659 self.warning(msg) 660 raise errors.PropertyError(msg) 661 662 self.debug('setting property %s on element %r to %s' % 663 (property, element_name, value)) 664 pygobject.gobject_set_property(element, property, value)
665 666 ### methods to connect component eaters and feeders 667
668 - def reconnectEater(self, eaterAlias):
669 if not self.medium: 670 self.debug("Can't reconnect eater %s, running " 671 "without a medium", eaterAlias) 672 return 673 674 self.eaters[eaterAlias].disconnected() 675 self.medium.connectEater(eaterAlias)
676
677 - def feedToFD(self, feedName, fd, cleanup, eaterId=None):
678 """ 679 @param feedName: name of the feed to feed to the given fd. 680 @type feedName: str 681 @param fd: the file descriptor to feed to 682 @type fd: int 683 @param cleanup: the function to call when the FD is no longer feeding 684 @type cleanup: callable 685 """ 686 self.debug('FeedToFD(%s, %d)', feedName, fd) 687 688 # We must have a pipeline in READY or above to do this. Do a 689 # non-blocking (zero timeout) get_state. 690 if (not self.pipeline or 691 self.pipeline.get_state(0)[1] == gst.STATE_NULL): 692 self.warning('told to feed %s to fd %d, but pipeline not ' 693 'running yet', feedName, fd) 694 cleanup(fd) 695 # can happen if we are restarting but the other component is 696 # happy; assume other side will reconnect later 697 return 698 699 if feedName not in self.feeders: 700 msg = "Cannot find feeder named '%s'" % feedName 701 mid = "feedToFD-%s" % feedName 702 m = messages.Warning(T_(N_("Internal Flumotion error.")), 703 debug=msg, mid=mid, priority=40) 704 self.state.append('messages', m) 705 self.warning(msg) 706 cleanup(fd) 707 return False 708 709 feeder = self.feeders[feedName] 710 element = self.get_element(feeder.elementName) 711 assert element 712 clientId = eaterId or ('client-%d' % fd) 713 element.emit('add', fd) 714 feeder.clientConnected(clientId, fd, cleanup)
715
716 - def eatFromFD(self, eaterAlias, feedId, fd):
717 """ 718 Tell the component to eat the given feedId from the given fd. 719 The component takes over the ownership of the fd, closing it when 720 no longer eating. 721 722 @param eaterAlias: the alias of the eater 723 @type eaterAlias: str 724 @param feedId: feed id (componentName:feedName) to eat from through 725 the given fd 726 @type feedId: str 727 @param fd: the file descriptor to eat from 728 @type fd: int 729 """ 730 self.debug('EatFromFD(%s, %s, %d)', eaterAlias, feedId, fd) 731 732 if not self.pipeline: 733 self.warning('told to eat %s from fd %d, but pipeline not ' 734 'running yet', feedId, fd) 735 # can happen if we are restarting but the other component is 736 # happy; assume other side will reconnect later 737 os.close(fd) 738 return 739 740 if eaterAlias not in self.eaters: 741 self.warning('Unknown eater alias: %s', eaterAlias) 742 os.close(fd) 743 return 744 745 eater = self.eaters[eaterAlias] 746 element = self.get_element(eater.elementName) 747 if not element: 748 self.warning('Eater element %s not found', eater.elementName) 749 os.close(fd) 750 return 751 752 # fdsrc only switches to the new fd in ready or below 753 (result, current, pending) = element.get_state(0L) 754 pipeline_playing = current not in [gst.STATE_NULL, gst.STATE_READY] 755 if pipeline_playing: 756 self.debug('eater %s in state %r, kidnapping it', 757 eaterAlias, current) 758 759 # we unlink fdsrc from its peer, take it out of the pipeline 760 # so we can set it to READY without having it send EOS, 761 # then switch fd and put it back in. 762 # To do this safely, we first block fdsrc:src, then let the 763 # component do any neccesary unlocking (needed for multi-input 764 # elements) 765 srcpad = element.get_pad('src') 766 767 def _block_cb(pad, blocked): 768 pass
769 srcpad.set_blocked_async(True, _block_cb) 770 # add buffer probe to drop buffers that are flagged as IN_CAPS 771 # needs to be done to gdpdepay's src pad 772 depay = self.get_element(eater.depayName) 773 774 def remove_in_caps_buffers(pad, buffer, eater): 775 if buffer.flag_is_set(gst.BUFFER_FLAG_IN_CAPS): 776 if self.keepStreamheaderForLater: 777 self.log("We got buffer with IN_CAPS which we are " 778 "keeping for later %r", eater) 779 eater.streamheader.append(buffer) 780 return False 781 self.info("We got streamheader buffer which " \ 782 "we are dropping because we do not want this just " \ 783 "after a reconnect because it breaks everything ") 784 return False 785 786 # now we have a buffer with no flag set 787 # we should remove the handler 788 self.log("We got buffer with no in caps flag set on " 789 "eater %r", eater) 790 if eater.streamheaderBufferProbeHandler: 791 self.log("Removing buffer probe on depay src pad on " 792 "eater %r", eater) 793 pad.remove_buffer_probe( 794 eater.streamheaderBufferProbeHandler) 795 eater.streamheaderBufferProbeHandler = None 796 else: 797 self.warning("buffer probe handler is None, bad news on " 798 "eater %r", eater) 799 800 if not self.dropStreamHeaders: 801 self.log("Pushing earlier buffers with IN_CAPS flag") 802 for buff in eater.streamheader: 803 pad.push(buff) 804 self.dropStreamHeaders = True 805 806 eater.streamheader = [] 807 return True 808 809 if not eater.streamheaderBufferProbeHandler: 810 self.log("Adding buffer probe on depay src pad on " 811 "eater %r", eater) 812 eater.streamheaderBufferProbeHandler = \ 813 depay.get_pad("src").add_buffer_probe( 814 remove_in_caps_buffers, eater) 815 816 self.unblock_eater(eaterAlias) 817 818 # Now, we can switch FD with this mess 819 sinkpad = srcpad.get_peer() 820 srcpad.unlink(sinkpad) 821 parent = element.get_parent() 822 parent.remove(element) 823 self.log("setting to ready") 824 element.set_state(gst.STATE_READY) 825 self.log("setting to ready complete!!!") 826 old = element.get_property('fd') 827 self.log("Closing old fd %d", old) 828 os.close(old) 829 element.set_property('fd', fd) 830 parent.add(element) 831 srcpad.link(sinkpad) 832 element.set_state(gst.STATE_PLAYING) 833 # We're done; unblock the pad 834 srcpad.set_blocked_async(False, _block_cb) 835 else: 836 element.set_property('fd', fd) 837 838 # update our eater uiState, saying that we are eating from a 839 # possibly new feedId 840 eater.connected(fd, feedId) 841 842 if not pipeline_playing: 843 self.try_start_pipeline() 844