using System; using System.Collections.Generic; using System.Linq; using System.Net.Http; using System.Net.Http.Json; using System.Text.Json.Serialization; using System.Threading; using System.Threading.Tasks; namespace FailiorSdk; public sealed class HTTPStatusError : Exception { public int StatusCode { get; } public string Body { get; } public HTTPStatusError(int statusCode, string body) : base($"failior: request rejected ({statusCode})") { StatusCode = statusCode; Body = body; } } public sealed class Packet { [JsonPropertyName("did_error")] public bool DidError { get; set; } [JsonPropertyName("packet_msg")] public string PacketMessage { get; set; } = string.Empty; [JsonPropertyName("graph_id")] public string GraphId { get; set; } = string.Empty; [JsonPropertyName("node_id_list")] public List NodeIdList { get; set; } = new(); [JsonPropertyName("timestamp")] public DateTimeOffset Timestamp { get; set; } = DateTimeOffset.UtcNow; } public sealed class PacketBatchResult { public int Accepted { get; init; } public int Failed { get; init; } public IReadOnlyList Errors { get; init; } = Array.Empty(); } public sealed class Tracker { private readonly Graph _graph; private readonly CancellationToken _cancellationToken; private readonly List _nodes = new(); private bool _ended; internal Tracker(Graph graph, CancellationToken cancellationToken) { _graph = graph; _cancellationToken = cancellationToken; } public void Node(string nodeId) { var value = (nodeId ?? string.Empty).Trim(); if (value.Length == 0) { throw new ArgumentException("failior: node_id required", nameof(nodeId)); } _nodes.Add(value); } public async Task EndAsync(Exception? err = null) { if (_ended) { return; } _ended = true; if (_nodes.Count == 0) { return; } var packet = new Packet { DidError = err is not null, PacketMessage = err?.Message ?? string.Empty, GraphId = _graph.GraphId, NodeIdList = new List(_nodes), Timestamp = DateTimeOffset.UtcNow }; await _graph.SendPacketAsync(packet, _cancellationToken); } } public sealed class Graph { public string GraphId { get; } public string BaseUrl { get; } public string IngressKey { get; } public TimeSpan Timeout { get; } private readonly HttpClient _httpClient; public Graph( string graphId, string baseUrl = "http://graph-ingress:8081", string ingressKey = "", TimeSpan? timeout = null, HttpClient? httpClient = null) { GraphId = (graphId ?? string.Empty).Trim(); if (GraphId.Length == 0) { throw new ArgumentException("failior: graph_id required", nameof(graphId)); } BaseUrl = (baseUrl ?? string.Empty).TrimEnd('/'); if (BaseUrl.Length == 0) { throw new ArgumentException("failior: baseUrl required", nameof(baseUrl)); } IngressKey = (ingressKey ?? string.Empty).Trim(); Timeout = timeout ?? TimeSpan.FromSeconds(5); _httpClient = httpClient ?? new HttpClient(); _httpClient.Timeout = Timeout; } public Tracker Track(CancellationToken cancellationToken = default) => new(this, cancellationToken); public Task InformUpAsync(string nodeId, string message = "", CancellationToken cancellationToken = default) => InformAsync(nodeId, "ok", message, cancellationToken); public Task InformErrorAsync(string nodeId, string message, CancellationToken cancellationToken = default) => InformAsync(nodeId, "error", message, cancellationToken); public Task InformAsync(string nodeId, string status, string message = "", CancellationToken cancellationToken = default) { var nodeValue = (nodeId ?? string.Empty).Trim(); if (nodeValue.Length == 0) { throw new ArgumentException("failior: node_id required", nameof(nodeId)); } var statusValue = (status ?? string.Empty).Trim(); if (statusValue != "ok" && statusValue != "error") { throw new ArgumentException("failior: status must be ok or error", nameof(status)); } var packet = new Packet { DidError = statusValue == "error", PacketMessage = message ?? string.Empty, GraphId = GraphId, NodeIdList = new List { nodeValue }, Timestamp = DateTimeOffset.UtcNow }; return SendPacketAsync(packet, cancellationToken); } public async Task SendPacketAsync(Packet packet, CancellationToken cancellationToken = default) { var payload = NormalizePacket(packet); using var request = new HttpRequestMessage(HttpMethod.Post, $"{BaseUrl}/ingest") { Content = JsonContent.Create(payload) }; if (IngressKey.Length > 0) { request.Headers.Add("X-Ingress-Key", IngressKey); } using var response = await _httpClient.SendAsync(request, cancellationToken); if (!response.IsSuccessStatusCode) { var body = await response.Content.ReadAsStringAsync(cancellationToken); throw new HTTPStatusError((int)response.StatusCode, body); } } public async Task SendPacketBatchAsync( IEnumerable packets, int workers = 8, CancellationToken cancellationToken = default) { var packetList = packets?.ToList() ?? new List(); if (packetList.Count == 0) { return new PacketBatchResult { Accepted = 0, Failed = 0, Errors = Array.Empty() }; } var gate = new SemaphoreSlim(Math.Max(1, workers)); var errors = new List(); var tasks = packetList.Select(async packet => { await gate.WaitAsync(cancellationToken); try { await SendPacketAsync(packet, cancellationToken); return true; } catch (Exception ex) { lock (errors) { errors.Add(ex); } return false; } finally { gate.Release(); } }).ToList(); var results = await Task.WhenAll(tasks); var accepted = results.Count(result => result); return new PacketBatchResult { Accepted = accepted, Failed = packetList.Count - accepted, Errors = errors }; } private Packet NormalizePacket(Packet packet) { var graphId = (packet.GraphId ?? string.Empty).Trim(); if (graphId.Length == 0) { graphId = GraphId; } if (graphId.Length == 0) { throw new ArgumentException("failior: graph_id required"); } var nodeIds = packet.NodeIdList?.Select(node => (node ?? string.Empty).Trim()).ToList() ?? new List(); if (nodeIds.Count == 0) { throw new ArgumentException("failior: node_id_list cannot be empty"); } if (nodeIds.Any(node => node.Length == 0)) { throw new ArgumentException("failior: node_id_list contains empty node id"); } if (packet.DidError && string.IsNullOrWhiteSpace(packet.PacketMessage)) { throw new ArgumentException("failior: packet_msg required when did_error is true"); } return new Packet { DidError = packet.DidError, PacketMessage = packet.PacketMessage ?? string.Empty, GraphId = graphId, NodeIdList = nodeIds, Timestamp = packet.Timestamp == default ? DateTimeOffset.UtcNow : packet.Timestamp }; } }