// Copyright (c) 2021 homuler // // Use of this source code is governed by an MIT-style // license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. using System; using System.Collections.Generic; namespace Mediapipe.Unity { public class OutputEventArgs : EventArgs { public readonly TValue value; public OutputEventArgs(TValue value) { this.value = value; } } public class OutputStream where TPacket : Packet, new() { private static int _Counter = 0; private static readonly GlobalInstanceTable> _InstanceTable = new GlobalInstanceTable>(20); protected readonly CalculatorGraph calculatorGraph; private readonly int _id; public readonly string streamName; public readonly string presenceStreamName; public readonly bool observeTimestampBounds; private OutputStreamPoller _poller; private TPacket _outputPacket; private OutputStreamPoller _presencePoller; private BoolPacket _presencePacket; private long _lastTimestampMicrosec; private long _timeoutMicrosec; public long timeoutMicrosec { get => _timeoutMicrosec; set => _timeoutMicrosec = Math.Max(0, value); } protected event EventHandler> OnReceived; private TPacket _referencePacket; protected TPacket referencePacket { get { if (_referencePacket == null) { _referencePacket = Packet.Create(IntPtr.Zero, false); } return _referencePacket; } } protected bool canTestPresence => presenceStreamName != null; /// /// Initialize a new instance of the class. /// /// /// If is set to false, there are no ways to know whether the output is present or not.
/// This can be especially problematic
/// - when trying to get the output synchronously, because the thread will hang forever if no value is output.
/// - when trying to get the output using callbacks, because they won't be called while no value is output.
///
/// The owner of the stream /// The name of the stream /// /// This parameter controlls the behaviour when no output is present.
/// When no output is present, if it's set to true, the stream outputs an empty packet, but if it's false, the stream does not output packets. /// /// /// If the output packet is empty, the instance drops the packet until the period specified here elapses. /// public OutputStream(CalculatorGraph calculatorGraph, string streamName, bool observeTimestampBounds = true, long timeoutMicrosec = 0) { _id = System.Threading.Interlocked.Increment(ref _Counter); this.calculatorGraph = calculatorGraph; this.streamName = streamName; this.observeTimestampBounds = observeTimestampBounds; this.timeoutMicrosec = timeoutMicrosec; _InstanceTable.Add(_id, this); } /// /// Initialize a new instance of the class.
/// It's necessary for the graph to have PacketPresenceCalculator node that calculates if the stream has output or not. ///
/// /// This is useful when you want to get the output synchronously but don't want to block the thread while waiting for the output. /// /// The owner of the stream /// The name of the stream /// /// The name of the stream that outputs true iff the output is present. /// /// /// If the output packet is empty, the instance drops the packet until the period specified here elapses. /// public OutputStream(CalculatorGraph calculatorGraph, string streamName, string presenceStreamName, long timeoutMicrosec = 0) : this(calculatorGraph, streamName, false, timeoutMicrosec) { this.presenceStreamName = presenceStreamName; } public Status StartPolling() { _outputPacket = new TPacket(); var statusOrPoller = calculatorGraph.AddOutputStreamPoller(streamName, observeTimestampBounds); var status = statusOrPoller.status; if (status.Ok()) { _poller = statusOrPoller.Value(); } if (presenceStreamName == null) { return status; } _presencePacket = new BoolPacket(); var statusOrPresencePoller = calculatorGraph.AddOutputStreamPoller(presenceStreamName, false); status = statusOrPresencePoller.status; if (status.Ok()) { _presencePoller = statusOrPresencePoller.Value(); } return status; } public void AddListener(EventHandler> callback) { if (OnReceived == null) { calculatorGraph.ObserveOutputStream(streamName, _id, InvokeIfOutputStreamFound, observeTimestampBounds).AssertOk(); } OnReceived += callback; } public void RemoveListener(EventHandler> eventHandler) { OnReceived -= eventHandler; } public void RemoveAllListeners() { OnReceived = null; } public void Close() { RemoveAllListeners(); _poller?.Dispose(); _poller = null; _outputPacket?.Dispose(); _outputPacket = null; _presencePoller?.Dispose(); _presencePoller = null; _presencePacket?.Dispose(); _presencePacket = null; _referencePacket?.Dispose(); _referencePacket = null; } /// /// Gets the next value from the stream. /// This method drops a packet whose timestamp is less than . /// /// /// /// If is set to true, MediaPipe outputs an empty packet when no output is present, but this method ignores those packets. /// This behavior is useful to avoid outputting an empty value when the detection fails for a particular frame. /// /// /// When is set to the default value, you cannot tell whether it's because the output was empty or not. /// /// /// /// When this method returns, it contains the next output value if it's present and retrieved successfully; otherwise, the default value for the type of the value parameter. /// This parameter is passed uninitialized. /// /// /// Drops outputs whose timestamp is less than this value. /// /// /// If true, this method can block the thread until the value is retrieved.
/// It can be set to false only if is set. /// /// /// true if is successfully retrieved; otherwise false. /// public bool TryGetNext(out TValue value, long timestampThreshold, bool allowBlock = true) { var timestampMicrosec = long.MinValue; while (timestampMicrosec < timestampThreshold) { if (!CanCallNext(allowBlock) || !Next()) { value = default; return false; } using (var timestamp = _outputPacket.Timestamp()) { timestampMicrosec = timestamp.Microseconds(); } } if (_outputPacket.IsEmpty()) { value = default; // TODO: distinguish when the output is empty and when it's not (retrieved value can be the default value). return false; } _lastTimestampMicrosec = timestampMicrosec; value = _outputPacket.Get(); return true; } public bool TryGetNext(out TValue value, bool allowBlock = true) { return TryGetNext(out value, 0, allowBlock); } public bool TryConsumeNext(out TValue value, long timestampThreshold, bool allowBlock = true) { long timestampMicrosec = 0; while (timestampMicrosec <= timestampThreshold) { if (!CanCallNext(allowBlock) || !Next()) { value = default; return true; } using (var timestamp = _outputPacket.Timestamp()) { timestampMicrosec = timestamp.Microseconds(); } } if (_outputPacket.IsEmpty()) { value = default; // TODO: distinguish when the output is empty and when it's not (retrieved value can be the default value). return false; } _lastTimestampMicrosec = timestampMicrosec; var statusOrValue = _outputPacket.Consume(); value = statusOrValue.ValueOr(); return true; } public bool TryConsumeNext(out TValue value, bool allowBlock = true) { return TryConsumeNext(out value, 0, allowBlock); } public bool ResetTimestampIfTimedOut(long timestampMicrosec, long timeoutMicrosec) { if (timestampMicrosec - _lastTimestampMicrosec <= timeoutMicrosec) { return false; } _lastTimestampMicrosec = timestampMicrosec; return true; } protected bool CanCallNext(bool allowBlock) { if (_poller == null) { Logger.LogWarning("OutputStreamPoller is not initialized. Call StartPolling before running the CalculatorGraph"); return false; } if (canTestPresence) { if (!allowBlock) { if (_presencePoller.QueueSize() <= 0) { return false; } } if (!NextPresence() || !_presencePacket.Get()) { // NOTE: _presencePacket.IsEmpty() always returns false return false; } } else if (!allowBlock) { Logger.LogWarning("Cannot avoid thread being blocked when `presenceStreamName` is not set"); return false; } return true; } protected bool NextPresence() { return Next(_presencePoller, _presencePacket, presenceStreamName); } protected bool Next() { return Next(_poller, _outputPacket, streamName); } protected static bool Next(OutputStreamPoller poller, Packet packet, string stream) { if (!poller.Next(packet)) { Logger.LogWarning($"Failed to get next value from {stream}. See logs for more details"); return false; } return true; } protected bool TryGetPacketValue(Packet packet, out TValue value, long timeoutMicrosec = 0) { using (var timestamp = packet.Timestamp()) { var currentMicrosec = timestamp.Microseconds(); if (!packet.IsEmpty()) { _lastTimestampMicrosec = currentMicrosec; value = packet.Get(); return true; } value = default; // TODO: distinguish when the output is empty and when it's not (retrieved value can be the default value). var hasTimedOut = currentMicrosec - _lastTimestampMicrosec >= timeoutMicrosec; if (hasTimedOut) { _lastTimestampMicrosec = currentMicrosec; } return hasTimedOut; } } protected bool TryConsumePacketValue(Packet packet, out TValue value, long timeoutMicrosec = 0) { using (var timestamp = packet.Timestamp()) { var currentMicrosec = timestamp.Microseconds(); if (!packet.IsEmpty()) { _lastTimestampMicrosec = currentMicrosec; var statusOrValue = packet.Consume(); value = statusOrValue.ValueOr(); return true; } value = default; // TODO: distinguish when the output is empty and when it's not (retrieved value can be the default value). var hasTimedOut = currentMicrosec - _lastTimestampMicrosec >= timeoutMicrosec; if (hasTimedOut) { _lastTimestampMicrosec = currentMicrosec; } return hasTimedOut; } } [AOT.MonoPInvokeCallback(typeof(CalculatorGraph.NativePacketCallback))] protected static Status.StatusArgs InvokeIfOutputStreamFound(IntPtr graphPtr, int streamId, IntPtr packetPtr) { try { var isFound = _InstanceTable.TryGetValue(streamId, out var outputStream); if (!isFound) { return Status.StatusArgs.NotFound($"OutputStream with id {streamId} is not found, maybe already GCed"); } if (outputStream.calculatorGraph.mpPtr != graphPtr) { return Status.StatusArgs.InvalidArgument($"OutputStream is found, but is not linked to the specified CalclatorGraph"); } outputStream.referencePacket.SwitchNativePtr(packetPtr); if (outputStream.TryGetPacketValue(outputStream.referencePacket, out var value, outputStream.timeoutMicrosec)) { outputStream.OnReceived?.Invoke(outputStream, new OutputEventArgs(value)); } outputStream.referencePacket.ReleaseMpResource(); return Status.StatusArgs.Ok(); } catch (Exception e) { return Status.StatusArgs.Internal(e.ToString()); } } } }