Files
2023-03-12 20:34:16 +00:00

415 lines
14 KiB
C#

// Copyright (c) 2021 homuler
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
using System;
using System.Collections.Generic;
namespace Mediapipe.Unity
{
public class OutputEventArgs<TValue> : EventArgs
{
public readonly TValue value;
public OutputEventArgs(TValue value)
{
this.value = value;
}
}
public class OutputStream<TPacket, TValue> where TPacket : Packet<TValue>, new()
{
private static int _Counter = 0;
private static readonly GlobalInstanceTable<int, OutputStream<TPacket, TValue>> _InstanceTable = new GlobalInstanceTable<int, OutputStream<TPacket, TValue>>(20);
protected readonly CalculatorGraph calculatorGraph;
private readonly int _id;
public readonly string streamName;
public readonly string presenceStreamName;
public readonly bool observeTimestampBounds;
private OutputStreamPoller<TValue> _poller;
private TPacket _outputPacket;
private OutputStreamPoller<bool> _presencePoller;
private BoolPacket _presencePacket;
private long _lastTimestampMicrosec;
private long _timeoutMicrosec;
public long timeoutMicrosec
{
get => _timeoutMicrosec;
set => _timeoutMicrosec = Math.Max(0, value);
}
protected event EventHandler<OutputEventArgs<TValue>> OnReceived;
private TPacket _referencePacket;
protected TPacket referencePacket
{
get
{
if (_referencePacket == null)
{
_referencePacket = Packet<TValue>.Create<TPacket>(IntPtr.Zero, false);
}
return _referencePacket;
}
}
protected bool canTestPresence => presenceStreamName != null;
/// <summary>
/// Initialize a new instance of the <see cref="OutputStream" /> class.
/// </summary>
/// <remarks>
/// If <paramref name="observeTimestampBounds" /> is set to <c>false</c>, there are no ways to know whether the output is present or not.<br/>
/// This can be especially problematic <br/>
/// - when trying to get the output synchronously, because the thread will hang forever if no value is output.<br />
/// - when trying to get the output using callbacks, because they won't be called while no value is output.<br />
/// </remarks>
/// <param name="calculatorGraph">The owner of the stream</param>
/// <param name="streamName">The name of the stream</param>
/// <param name="observeTimestampBounds">
/// This parameter controlls the behaviour when no output is present. <br/>
/// When no output is present, if it's set to <c>true</c>, the stream outputs an empty packet, but if it's <c>false</c>, the stream does not output packets.
/// </param>
/// <param name="timeoutMicrosec">
/// If the output packet is empty, the <see cref="OutputStream" /> instance drops the packet until the period specified here elapses.
/// </param>
public OutputStream(CalculatorGraph calculatorGraph, string streamName, bool observeTimestampBounds = true, long timeoutMicrosec = 0)
{
_id = System.Threading.Interlocked.Increment(ref _Counter);
this.calculatorGraph = calculatorGraph;
this.streamName = streamName;
this.observeTimestampBounds = observeTimestampBounds;
this.timeoutMicrosec = timeoutMicrosec;
_InstanceTable.Add(_id, this);
}
/// <summary>
/// Initialize a new instance of the <see cref="OutputStream" /> class.<br />
/// It's necessary for the graph to have <c>PacketPresenceCalculator</c> node that calculates if the stream has output or not.
/// </summary>
/// <remarks>
/// This is useful when you want to get the output synchronously but don't want to block the thread while waiting for the output.
/// </remarks>
/// <param name="calculatorGraph">The owner of the stream</param>
/// <param name="streamName">The name of the stream</param>
/// <param name="presenceStreamName">
/// The name of the stream that outputs true iff the output is present.
/// </param>
/// <param name="timeoutMicrosec">
/// If the output packet is empty, the <see cref="OutputStream" /> instance drops the packet until the period specified here elapses.
/// </param>
public OutputStream(CalculatorGraph calculatorGraph, string streamName, string presenceStreamName, long timeoutMicrosec = 0) : this(calculatorGraph, streamName, false, timeoutMicrosec)
{
this.presenceStreamName = presenceStreamName;
}
public Status StartPolling()
{
_outputPacket = new TPacket();
var statusOrPoller = calculatorGraph.AddOutputStreamPoller<TValue>(streamName, observeTimestampBounds);
var status = statusOrPoller.status;
if (status.Ok())
{
_poller = statusOrPoller.Value();
}
if (presenceStreamName == null)
{
return status;
}
_presencePacket = new BoolPacket();
var statusOrPresencePoller = calculatorGraph.AddOutputStreamPoller<bool>(presenceStreamName, false);
status = statusOrPresencePoller.status;
if (status.Ok())
{
_presencePoller = statusOrPresencePoller.Value();
}
return status;
}
public void AddListener(EventHandler<OutputEventArgs<TValue>> callback)
{
if (OnReceived == null)
{
calculatorGraph.ObserveOutputStream(streamName, _id, InvokeIfOutputStreamFound, observeTimestampBounds).AssertOk();
}
OnReceived += callback;
}
public void RemoveListener(EventHandler<OutputEventArgs<TValue>> eventHandler)
{
OnReceived -= eventHandler;
}
public void RemoveAllListeners()
{
OnReceived = null;
}
public void Close()
{
RemoveAllListeners();
_poller?.Dispose();
_poller = null;
_outputPacket?.Dispose();
_outputPacket = null;
_presencePoller?.Dispose();
_presencePoller = null;
_presencePacket?.Dispose();
_presencePacket = null;
_referencePacket?.Dispose();
_referencePacket = null;
}
/// <summary>
/// Gets the next value from the stream.
/// This method drops a packet whose timestamp is less than <paramref name="timestampThreshold" />.
/// </summary>
/// <remarks>
/// <para>
/// If <see cref="observeTimestampBounds" /> is set to true, MediaPipe outputs an empty packet when no output is present, but this method ignores those packets.
/// This behavior is useful to avoid outputting an empty value when the detection fails for a particular frame.
/// </para>
/// <para>
/// When <paramref name="value" /> is set to the default value, you cannot tell whether it's because the output was empty or not.
/// </para>
/// </remarks>
/// <param name="value">
/// When this method returns, it contains the next output value if it's present and retrieved successfully; otherwise, the default value for the type of the value parameter.
/// This parameter is passed uninitialized.
/// </param>
/// <param name="timestampThreshold">
/// Drops outputs whose timestamp is less than this value.
/// </param>
/// <param name="allowBlock">
/// If <c>true</c>, this method can block the thread until the value is retrieved.<br />
/// It can be set to <c>false</c> only if <see cref="presenceStreamName" /> is set.
/// </param>
/// <returns>
/// <c>true</c> if <paramref name="value" /> is successfully retrieved; otherwise <c>false</c>.
/// </returns>
public bool TryGetNext(out TValue value, long timestampThreshold, bool allowBlock = true)
{
var timestampMicrosec = long.MinValue;
while (timestampMicrosec < timestampThreshold)
{
if (!CanCallNext(allowBlock) || !Next())
{
value = default;
return false;
}
using (var timestamp = _outputPacket.Timestamp())
{
timestampMicrosec = timestamp.Microseconds();
}
}
if (_outputPacket.IsEmpty())
{
value = default; // TODO: distinguish when the output is empty and when it's not (retrieved value can be the default value).
return false;
}
_lastTimestampMicrosec = timestampMicrosec;
value = _outputPacket.Get();
return true;
}
public bool TryGetNext(out TValue value, bool allowBlock = true)
{
return TryGetNext(out value, 0, allowBlock);
}
public bool TryConsumeNext(out TValue value, long timestampThreshold, bool allowBlock = true)
{
long timestampMicrosec = 0;
while (timestampMicrosec <= timestampThreshold)
{
if (!CanCallNext(allowBlock) || !Next())
{
value = default;
return true;
}
using (var timestamp = _outputPacket.Timestamp())
{
timestampMicrosec = timestamp.Microseconds();
}
}
if (_outputPacket.IsEmpty())
{
value = default; // TODO: distinguish when the output is empty and when it's not (retrieved value can be the default value).
return false;
}
_lastTimestampMicrosec = timestampMicrosec;
var statusOrValue = _outputPacket.Consume();
value = statusOrValue.ValueOr();
return true;
}
public bool TryConsumeNext(out TValue value, bool allowBlock = true)
{
return TryConsumeNext(out value, 0, allowBlock);
}
public bool ResetTimestampIfTimedOut(long timestampMicrosec, long timeoutMicrosec)
{
if (timestampMicrosec - _lastTimestampMicrosec <= timeoutMicrosec)
{
return false;
}
_lastTimestampMicrosec = timestampMicrosec;
return true;
}
protected bool CanCallNext(bool allowBlock)
{
if (_poller == null)
{
Logger.LogWarning("OutputStreamPoller is not initialized. Call StartPolling before running the CalculatorGraph");
return false;
}
if (canTestPresence)
{
if (!allowBlock)
{
if (_presencePoller.QueueSize() <= 0)
{
return false;
}
}
if (!NextPresence() || !_presencePacket.Get())
{
// NOTE: _presencePacket.IsEmpty() always returns false
return false;
}
}
else if (!allowBlock)
{
Logger.LogWarning("Cannot avoid thread being blocked when `presenceStreamName` is not set");
return false;
}
return true;
}
protected bool NextPresence()
{
return Next(_presencePoller, _presencePacket, presenceStreamName);
}
protected bool Next()
{
return Next(_poller, _outputPacket, streamName);
}
protected static bool Next<T>(OutputStreamPoller<T> poller, Packet<T> packet, string stream)
{
if (!poller.Next(packet))
{
Logger.LogWarning($"Failed to get next value from {stream}. See logs for more details");
return false;
}
return true;
}
protected bool TryGetPacketValue(Packet<TValue> packet, out TValue value, long timeoutMicrosec = 0)
{
using (var timestamp = packet.Timestamp())
{
var currentMicrosec = timestamp.Microseconds();
if (!packet.IsEmpty())
{
_lastTimestampMicrosec = currentMicrosec;
value = packet.Get();
return true;
}
value = default; // TODO: distinguish when the output is empty and when it's not (retrieved value can be the default value).
var hasTimedOut = currentMicrosec - _lastTimestampMicrosec >= timeoutMicrosec;
if (hasTimedOut)
{
_lastTimestampMicrosec = currentMicrosec;
}
return hasTimedOut;
}
}
protected bool TryConsumePacketValue(Packet<TValue> packet, out TValue value, long timeoutMicrosec = 0)
{
using (var timestamp = packet.Timestamp())
{
var currentMicrosec = timestamp.Microseconds();
if (!packet.IsEmpty())
{
_lastTimestampMicrosec = currentMicrosec;
var statusOrValue = packet.Consume();
value = statusOrValue.ValueOr();
return true;
}
value = default; // TODO: distinguish when the output is empty and when it's not (retrieved value can be the default value).
var hasTimedOut = currentMicrosec - _lastTimestampMicrosec >= timeoutMicrosec;
if (hasTimedOut)
{
_lastTimestampMicrosec = currentMicrosec;
}
return hasTimedOut;
}
}
[AOT.MonoPInvokeCallback(typeof(CalculatorGraph.NativePacketCallback))]
protected static Status.StatusArgs InvokeIfOutputStreamFound(IntPtr graphPtr, int streamId, IntPtr packetPtr)
{
try
{
var isFound = _InstanceTable.TryGetValue(streamId, out var outputStream);
if (!isFound)
{
return Status.StatusArgs.NotFound($"OutputStream with id {streamId} is not found, maybe already GCed");
}
if (outputStream.calculatorGraph.mpPtr != graphPtr)
{
return Status.StatusArgs.InvalidArgument($"OutputStream is found, but is not linked to the specified CalclatorGraph");
}
outputStream.referencePacket.SwitchNativePtr(packetPtr);
if (outputStream.TryGetPacketValue(outputStream.referencePacket, out var value, outputStream.timeoutMicrosec))
{
outputStream.OnReceived?.Invoke(outputStream, new OutputEventArgs<TValue>(value));
}
outputStream.referencePacket.ReleaseMpResource();
return Status.StatusArgs.Ok();
}
catch (Exception e)
{
return Status.StatusArgs.Internal(e.ToString());
}
}
}
}