1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import gst
23 import gobject
24
25 import os
26 import time
27
28 from twisted.internet import reactor, defer
29
30 from flumotion.common import common, errors, pygobject, messages, log
31 from flumotion.common import gstreamer
32 from flumotion.common.i18n import N_, gettexter
33 from flumotion.common.planet import moods
34 from flumotion.component import component as basecomponent
35 from flumotion.component import feed, padmonitor
36 from flumotion.component.feeder import Feeder
37 from flumotion.component.eater import Eater
38
39 __version__ = "$Rev: 8800 $"
40 T_ = gettexter()
41
42
44 """
45 I am a base class for all Flumotion feed components.
46 """
47
48
49 FEEDER_STATS_UPDATE_FREQUENCY = 12.5
50 keepStreamheaderForLater = False
51 dropStreamHeaders = True
52 swallowNewSegment = True
53
54 logCategory = 'feedcomponent'
55
56
57
59
60 self.feeders = {}
61 self.eaters = {}
62 self.uiState.addListKey('feeders')
63 self.uiState.addListKey('eaters')
64
65 self.pipeline = None
66 self.pipeline_signals = []
67 self.bus_signal_id = None
68 self.effects = {}
69 self._feeder_probe_cl = None
70
71 self._pad_monitors = padmonitor.PadMonitorSet(
72 lambda: self.setMood(moods.happy),
73 lambda: self.setMood(moods.hungry))
74
75 self._clock_slaved = False
76 self.clock_provider = None
77 self._master_clock_info = None
78
79
80 self._change_monitor = gstreamer.StateChangeMonitor()
81
82
83 self._get_stats_supported = (gstreamer.get_plugin_version('tcp')
84 >= (0, 10, 11, 0))
85
87 """
88 Sets up component.
89
90 Invokes the L{create_pipeline} and L{set_pipeline} vmethods,
91 which subclasses can provide.
92 """
93 config = self.config
94 eater_config = config.get('eater', {})
95 feeder_config = config.get('feed', [])
96 source_config = config.get('source', [])
97
98 self.debug("FeedComponent.do_setup(): eater_config %r", eater_config)
99 self.debug("FeedComponent.do_setup(): feeder_config %r", feeder_config)
100 self.debug("FeedComponent.do_setup(): source_config %r", source_config)
101
102
103
104 if eater_config == {} and source_config != []:
105 eater_config = {'default': [(x, 'default') for x in source_config]}
106
107 for eaterName in eater_config:
108 for feedId, eaterAlias in eater_config[eaterName]:
109 self.eaters[eaterAlias] = Eater(eaterAlias, eaterName)
110 self.uiState.append('eaters', self.eaters[eaterAlias].uiState)
111
112 for feederName in feeder_config:
113 self.feeders[feederName] = Feeder(feederName)
114 self.uiState.append('feeders',
115 self.feeders[feederName].uiState)
116
117 clockMaster = config.get('clock-master', None)
118 if clockMaster:
119 self._clock_slaved = clockMaster != config['avatarId']
120 else:
121 self._clock_slaved = False
122
123 pipeline = self.create_pipeline()
124 self.connect_feeders(pipeline)
125 self.set_pipeline(pipeline)
126
127 self.debug("FeedComponent.do_setup(): setup finished")
128
129 self.try_start_pipeline()
130
131
132 d = self._change_monitor.add(gst.STATE_CHANGE_PAUSED_TO_PLAYING)
133 d.addCallback(lambda x: self.do_pipeline_playing())
134
136
137
138 self.debug("Setup completed")
139
140
141
143 """
144 Subclasses have to implement this method.
145
146 @rtype: L{gst.Pipeline}
147 """
148 raise NotImplementedError(
149 "subclass must implement create_pipeline")
150
160
162 elementName = self.feeders[feederName].payName
163 element = self.pipeline.get_by_name(elementName)
164 if not element:
165 raise errors.ComponentError("No such feeder %s" % feederName)
166
167 pad = element.get_pad('src')
168 self._pad_monitors.attach(pad, elementName)
169
170
171
175
177
178
179
180 def client_fd_removed(sink, fd, feeder):
181
182
183
184
185 self.debug("cleaning up fd %d", fd)
186 feeder.clientDisconnected(fd)
187
188 for feeder in self.feeders.values():
189 element = pipeline.get_by_name(feeder.elementName)
190 if element:
191 element.connect('client-fd-removed', client_fd_removed,
192 feeder)
193 self.debug("Connected to client-fd-removed on %r", feeder)
194 else:
195 self.warning("No feeder %s in pipeline", feeder.elementName)
196
199
201 """
202 Invoked when the pipeline has changed the state to playing.
203 The default implementation sets the component's mood to HAPPY.
204 """
205 self.setMood(moods.happy)
206
208 """Make a flumotion error message to show to the user.
209
210 This method may be overridden by components that have special
211 knowledge about potential errors. If the component does not know
212 about the error, it can chain up to this implementation, which
213 will make a generic message.
214
215 @param gerror: The GError from the error message posted on the
216 GStreamer message bus.
217 @type gerror: L{gst.GError}
218 @param debug: A string with debugging information.
219 @type debug: str
220
221 @returns: A L{flumotion.common.messages.Message} to show to the
222 user.
223 """
224
225 mid = "%s-%s-%d" % (self.name, gerror.domain, gerror.code)
226 m = messages.Error(T_(N_(
227 "Internal GStreamer error.")),
228 debug="%s\n%s: %d\n%s" % (
229 gerror.message, gerror.domain, gerror.code, debug),
230 mid=mid, priority=40)
231 return m
232
243
244 def error():
245 gerror, debug = message.parse_error()
246 self.warning('element %s error %s %s',
247 src.get_path_string(), gerror, debug)
248 self.setMood(moods.sad)
249
250
251 try:
252 m = self.make_message_for_gstreamer_error(gerror, debug)
253 except Exception, e:
254 msg = log.getExceptionMessage(e)
255 m = messages.Error(T_(N_(
256 "Programming error in component.")),
257 debug="Bug in %r.make_message_for_gstreamer_error: %s" % (
258 self.__class__, msg))
259
260 self.state.append('messages', m)
261 self._change_monitor.have_error(self.pipeline.get_state(),
262 message)
263
264 def eos():
265 name = src.get_name()
266 if name in self._pad_monitors:
267 self.info('End of stream in element %s', name)
268 self._pad_monitors[name].setInactive()
269 else:
270 self.info("We got an eos from %s", name)
271
272 def default():
273 self.log('message received: %r', message)
274
275 handlers = {gst.MESSAGE_STATE_CHANGED: state_changed,
276 gst.MESSAGE_ERROR: error,
277 gst.MESSAGE_EOS: eos}
278 t = message.type
279 src = message.src
280 handlers.get(t, default)()
281 return True
282
284 """Watch a set of elements for discontinuity messages.
285
286 @param eaterWatchElements: the set of elements to watch for
287 discontinuities.
288 @type eaterWatchElements: Dict of elementName => Eater.
289 """
290
291 def on_element_message(bus, message):
292 src = message.src
293 name = src.get_name()
294 if name in eaterWatchElements:
295 eater = eaterWatchElements[name]
296 s = message.structure
297
298 def timestampDiscont():
299 prevTs = s["prev-timestamp"]
300 prevDuration = s["prev-duration"]
301 curTs = s["cur-timestamp"]
302 discont = curTs - (prevTs + prevDuration)
303 dSeconds = discont / float(gst.SECOND)
304 self.debug("we have a discont on eater %s of %f s "
305 "between %s and %s ", eater.eaterAlias,
306 dSeconds, gst.TIME_ARGS(prevTs + prevDuration),
307 gst.TIME_ARGS(curTs))
308 eater.timestampDiscont(dSeconds,
309 float(curTs) / float(gst.SECOND))
310
311 def offsetDiscont():
312 prevOffsetEnd = s["prev-offset-end"]
313 curOffset = s["cur-offset"]
314 discont = curOffset - prevOffsetEnd
315 self.debug("we have a discont on eater %s of %d "
316 "units between %d and %d ",
317 eater.eaterAlias, discont, prevOffsetEnd,
318 curOffset)
319 eater.offsetDiscont(discont, curOffset)
320
321 handlers = {'imperfect-timestamp': timestampDiscont,
322 'imperfect-offset': offsetDiscont}
323 if s.get_name() in handlers:
324 handlers[s.get_name()]()
325
326
327 bus = self.pipeline.get_bus()
328
329 bus.connect("message::element", on_element_message)
330
332
333 def fdsrc_event(pad, event):
334
335
336 if event.type == gst.EVENT_EOS:
337 self.info('End of stream for eater %s, disconnect will be '
338 'triggered', eater.eaterAlias)
339
340
341
342 return False
343 return True
344
345 def depay_event(pad, event):
346
347
348
349 if event.type == gst.EVENT_NEWSEGMENT:
350
351
352
353
354
355
356
357 if getattr(eater, '_gotFirstNewSegment', False):
358 self.info("Subsequent new segment event received on "
359 "depay on eater %s", eater.eaterAlias)
360
361 eater.streamheader = []
362 if self.swallowNewSegment:
363 return False
364 else:
365 eater._gotFirstNewSegment = True
366 return True
367
368 self.debug('adding event probe for eater %s', eater.eaterAlias)
369 fdsrc = self.get_element(eater.elementName)
370 fdsrc.get_pad("src").add_event_probe(fdsrc_event)
371 depay = self.get_element(eater.depayName)
372 depay.get_pad("src").add_event_probe(depay_event)
373
375 self.debug('setup_pipeline()')
376 assert self.bus_signal_id == None
377
378 self.pipeline.set_name('pipeline-' + self.getName())
379 bus = self.pipeline.get_bus()
380 bus.add_signal_watch()
381 self.bus_signal_id = bus.connect('message',
382 self.bus_message_received_cb)
383 sig_id = self.pipeline.connect('deep-notify',
384 gstreamer.verbose_deep_notify_cb, self)
385 self.pipeline_signals.append(sig_id)
386
387
388
389 self.pipeline.set_state(gst.STATE_READY)
390
391
392 if self._get_stats_supported:
393 self._feeder_probe_cl = reactor.callLater(
394 self.FEEDER_STATS_UPDATE_FREQUENCY,
395 self._feeder_probe_calllater)
396 else:
397 self.warning("Feeder statistics unavailable, your "
398 "gst-plugins-base is too old")
399 m = messages.Warning(T_(N_(
400 "Your gst-plugins-base is too old, so "
401 "feeder statistics will be unavailable.")),
402 mid='multifdsink')
403 m.add(T_(N_(
404 "Please upgrade '%s' to version %s."), 'gst-plugins-base',
405 '0.10.11'))
406 self.addMessage(m)
407
408 for eater in self.eaters.values():
409 self.install_eater_event_probes(eater)
410 pad = self.get_element(eater.elementName).get_pad('src')
411 self._pad_monitors.attach(pad, eater.elementName,
412 padmonitor.EaterPadMonitor,
413 self.reconnectEater,
414 eater.eaterAlias)
415 eater.setPadMonitor(self._pad_monitors[eater.elementName])
416
418 if not self.pipeline:
419 return
420
421 if self.clock_provider:
422 self.clock_provider.set_property('active', False)
423 self.clock_provider = None
424 retval = self.pipeline.set_state(gst.STATE_NULL)
425 if retval != gst.STATE_CHANGE_SUCCESS:
426 self.warning('Setting pipeline to NULL failed')
427
429 self.debug("cleaning up")
430
431 assert self.pipeline != None
432
433 self.stop_pipeline()
434
435 map(self.pipeline.disconnect, self.pipeline_signals)
436 self.pipeline_signals = []
437 if self.bus_signal_id:
438 self.pipeline.get_bus().disconnect(self.bus_signal_id)
439 self.pipeline.get_bus().remove_signal_watch()
440 self.bus_signal_id = None
441 self.pipeline = None
442
443 if self._feeder_probe_cl:
444 self._feeder_probe_cl.cancel()
445 self._feeder_probe_cl = None
446
447
448 for eater in self.eaters.values():
449 self._pad_monitors.remove(eater.elementName)
450 eater.setPadMonitor(None)
451
458
460 self.debug("Master clock set to %s:%d with base_time %s", ip, port,
461 gst.TIME_ARGS(base_time))
462
463 assert self._clock_slaved
464 if self._master_clock_info == (ip, port, base_time):
465 self.debug("Same master clock info, returning directly")
466 return defer.succeed(None)
467 elif self._master_clock_info:
468 self.stop_pipeline()
469
470 self._master_clock_info = ip, port, base_time
471
472 clock = gst.NetClientClock(None, ip, port, base_time)
473
474
475 self.pipeline.set_new_stream_time(gst.CLOCK_TIME_NONE)
476 self.pipeline.set_base_time(base_time)
477 self.pipeline.use_clock(clock)
478
479 self.try_start_pipeline()
480
482 """
483 Return the connection details for the network clock provided by
484 this component, if any.
485 """
486 if self.clock_provider:
487 ip, port, base_time = self._master_clock_info
488 return ip, port, base_time
489 else:
490 return None
491
493 """
494 Tell the component to provide a master clock on the given port.
495
496 @returns: a deferred firing a (ip, port, base_time) triple.
497 """
498
499 def pipelinePaused(r):
500 clock = self.pipeline.get_clock()
501
502 self.pipeline.use_clock(clock)
503
504 self.clock_provider = gst.NetTimeProvider(clock, None, port)
505 realport = self.clock_provider.get_property('port')
506
507 base_time = self.pipeline.get_base_time()
508
509 self.debug('provided master clock from %r, base time %s',
510 clock, gst.TIME_ARGS(base_time))
511
512 if self.medium:
513
514
515
516 ip = self.medium.getIP()
517 else:
518 ip = "127.0.0.1"
519
520 self._master_clock_info = (ip, realport, base_time)
521 return self.get_master_clock()
522
523 assert self.pipeline
524 assert not self._clock_slaved
525 (ret, state, pending) = self.pipeline.get_state(0)
526 if state != gst.STATE_PAUSED and state != gst.STATE_PLAYING:
527 self.debug("pipeline still spinning up: %r", state)
528 d = self._change_monitor.add(gst.STATE_CHANGE_READY_TO_PAUSED)
529 d.addCallback(pipelinePaused)
530 return d
531 elif self.clock_provider:
532 self.debug("returning existing master clock info")
533 return defer.succeed(self.get_master_clock())
534 else:
535 return defer.maybeDeferred(pipelinePaused, None)
536
538 """
539 Dumps a graphviz dot file of the pipeline's current state to disk.
540 This will only actually do anything if the environment variable
541 GST_DEBUG_DUMP_DOT_DIR is set.
542
543 @param filename: filename to store
544 @param with_timestamp: if True, then timestamp will be prepended to
545 filename
546 """
547 if hasattr(gst, "DEBUG_BIN_TO_DOT_FILE"):
548 method = gst.DEBUG_BIN_TO_DOT_FILE
549 if with_timestamp:
550 method = gst.DEBUG_BIN_TO_DOT_FILE_WITH_TS
551 method(self.pipeline, gst.DEBUG_GRAPH_SHOW_ALL, filename)
552
553
554
556 """
557 Tell the component to start.
558 Whatever is using the component is responsible for making sure all
559 eaters have received their file descriptor to eat from.
560 """
561 (ret, state, pending) = self.pipeline.get_state(0)
562 if state == gst.STATE_PLAYING:
563 self.log('already PLAYING')
564 if not force:
565 return
566 self.debug('pipeline PLAYING, but starting anyway as requested')
567
568 if self._clock_slaved and not self._master_clock_info:
569 self.debug("Missing master clock info, deferring set to PLAYING")
570 return
571
572 for eater in self.eaters.values():
573 if not eater.fd:
574 self.debug('eater %s not yet connected, deferring set to '
575 'PLAYING', eater.eaterAlias)
576 return
577
578 self.debug("Setting pipeline %r to GST_STATE_PLAYING", self.pipeline)
579 self.pipeline.set_state(gst.STATE_PLAYING)
580
603
605 """
606 After this function returns, the stream lock for this eater must have
607 been released. If your component needs to do something here, override
608 this method.
609 """
610 pass
611
613 """Get an element out of the pipeline.
614
615 If it is possible that the component has not yet been set up,
616 the caller needs to check if self.pipeline is actually set.
617 """
618 assert self.pipeline
619 self.log('Looking up element %r in pipeline %r',
620 element_name, self.pipeline)
621 element = self.pipeline.get_by_name(element_name)
622 if not element:
623 self.warning("No element named %r in pipeline", element_name)
624 return element
625
627 'Gets a property of an element in the GStreamer pipeline.'
628 self.debug("%s: getting property %s of element %s" % (
629 self.getName(), property, element_name))
630 element = self.get_element(element_name)
631 if not element:
632 msg = "Element '%s' does not exist" % element_name
633 self.warning(msg)
634 raise errors.PropertyError(msg)
635
636 self.debug('getting property %s on element %s' % (
637 property, element_name))
638 try:
639 value = element.get_property(property)
640 except (ValueError, TypeError):
641 msg = "Property '%s' on element '%s' does not exist" % (
642 property, element_name)
643 self.warning(msg)
644 raise errors.PropertyError(msg)
645
646
647 if isinstance(value, gobject.GEnum):
648 value = int(value)
649
650 return value
651
653 'Sets a property on an element in the GStreamer pipeline.'
654 self.debug("%s: setting property %s of element %s to %s" % (
655 self.getName(), property, element_name, value))
656 element = self.get_element(element_name)
657 if not element:
658 msg = "Element '%s' does not exist" % element_name
659 self.warning(msg)
660 raise errors.PropertyError(msg)
661
662 self.debug('setting property %s on element %r to %s' %
663 (property, element_name, value))
664 pygobject.gobject_set_property(element, property, value)
665
666
667
669 if not self.medium:
670 self.debug("Can't reconnect eater %s, running "
671 "without a medium", eaterAlias)
672 return
673
674 self.eaters[eaterAlias].disconnected()
675 self.medium.connectEater(eaterAlias)
676
677 - def feedToFD(self, feedName, fd, cleanup, eaterId=None):
678 """
679 @param feedName: name of the feed to feed to the given fd.
680 @type feedName: str
681 @param fd: the file descriptor to feed to
682 @type fd: int
683 @param cleanup: the function to call when the FD is no longer feeding
684 @type cleanup: callable
685 """
686 self.debug('FeedToFD(%s, %d)', feedName, fd)
687
688
689
690 if (not self.pipeline or
691 self.pipeline.get_state(0)[1] == gst.STATE_NULL):
692 self.warning('told to feed %s to fd %d, but pipeline not '
693 'running yet', feedName, fd)
694 cleanup(fd)
695
696
697 return
698
699 if feedName not in self.feeders:
700 msg = "Cannot find feeder named '%s'" % feedName
701 mid = "feedToFD-%s" % feedName
702 m = messages.Warning(T_(N_("Internal Flumotion error.")),
703 debug=msg, mid=mid, priority=40)
704 self.state.append('messages', m)
705 self.warning(msg)
706 cleanup(fd)
707 return False
708
709 feeder = self.feeders[feedName]
710 element = self.get_element(feeder.elementName)
711 assert element
712 clientId = eaterId or ('client-%d' % fd)
713 element.emit('add', fd)
714 feeder.clientConnected(clientId, fd, cleanup)
715
716 - def eatFromFD(self, eaterAlias, feedId, fd):
717 """
718 Tell the component to eat the given feedId from the given fd.
719 The component takes over the ownership of the fd, closing it when
720 no longer eating.
721
722 @param eaterAlias: the alias of the eater
723 @type eaterAlias: str
724 @param feedId: feed id (componentName:feedName) to eat from through
725 the given fd
726 @type feedId: str
727 @param fd: the file descriptor to eat from
728 @type fd: int
729 """
730 self.debug('EatFromFD(%s, %s, %d)', eaterAlias, feedId, fd)
731
732 if not self.pipeline:
733 self.warning('told to eat %s from fd %d, but pipeline not '
734 'running yet', feedId, fd)
735
736
737 os.close(fd)
738 return
739
740 if eaterAlias not in self.eaters:
741 self.warning('Unknown eater alias: %s', eaterAlias)
742 os.close(fd)
743 return
744
745 eater = self.eaters[eaterAlias]
746 element = self.get_element(eater.elementName)
747 if not element:
748 self.warning('Eater element %s not found', eater.elementName)
749 os.close(fd)
750 return
751
752
753 (result, current, pending) = element.get_state(0L)
754 pipeline_playing = current not in [gst.STATE_NULL, gst.STATE_READY]
755 if pipeline_playing:
756 self.debug('eater %s in state %r, kidnapping it',
757 eaterAlias, current)
758
759
760
761
762
763
764
765 srcpad = element.get_pad('src')
766
767 def _block_cb(pad, blocked):
768 pass
769 srcpad.set_blocked_async(True, _block_cb)
770
771
772 depay = self.get_element(eater.depayName)
773
774 def remove_in_caps_buffers(pad, buffer, eater):
775 if buffer.flag_is_set(gst.BUFFER_FLAG_IN_CAPS):
776 if self.keepStreamheaderForLater:
777 self.log("We got buffer with IN_CAPS which we are "
778 "keeping for later %r", eater)
779 eater.streamheader.append(buffer)
780 return False
781 self.info("We got streamheader buffer which " \
782 "we are dropping because we do not want this just " \
783 "after a reconnect because it breaks everything ")
784 return False
785
786
787
788 self.log("We got buffer with no in caps flag set on "
789 "eater %r", eater)
790 if eater.streamheaderBufferProbeHandler:
791 self.log("Removing buffer probe on depay src pad on "
792 "eater %r", eater)
793 pad.remove_buffer_probe(
794 eater.streamheaderBufferProbeHandler)
795 eater.streamheaderBufferProbeHandler = None
796 else:
797 self.warning("buffer probe handler is None, bad news on "
798 "eater %r", eater)
799
800 if not self.dropStreamHeaders:
801 self.log("Pushing earlier buffers with IN_CAPS flag")
802 for buff in eater.streamheader:
803 pad.push(buff)
804 self.dropStreamHeaders = True
805
806 eater.streamheader = []
807 return True
808
809 if not eater.streamheaderBufferProbeHandler:
810 self.log("Adding buffer probe on depay src pad on "
811 "eater %r", eater)
812 eater.streamheaderBufferProbeHandler = \
813 depay.get_pad("src").add_buffer_probe(
814 remove_in_caps_buffers, eater)
815
816 self.unblock_eater(eaterAlias)
817
818
819 sinkpad = srcpad.get_peer()
820 srcpad.unlink(sinkpad)
821 parent = element.get_parent()
822 parent.remove(element)
823 self.log("setting to ready")
824 element.set_state(gst.STATE_READY)
825 self.log("setting to ready complete!!!")
826 old = element.get_property('fd')
827 self.log("Closing old fd %d", old)
828 os.close(old)
829 element.set_property('fd', fd)
830 parent.add(element)
831 srcpad.link(sinkpad)
832 element.set_state(gst.STATE_PLAYING)
833
834 srcpad.set_blocked_async(False, _block_cb)
835 else:
836 element.set_property('fd', fd)
837
838
839
840 eater.connected(fd, feedId)
841
842 if not pipeline_playing:
843 self.try_start_pipeline()
844