#include <curl/curl.h>

#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <ctime>
#include <exception>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <sstream>
#include <stdexcept>
#include <string>
#include <thread>
#include <utility>
#include <vector>

namespace failior {

struct Packet {
  bool did_error = false;
  std::string packet_msg;
  std::string graph_id;
  std::vector<std::string> node_id_list;
  std::string timestamp;
};

struct PacketBatchResult {
  int accepted = 0;
  int failed = 0;
  std::vector<std::string> errors;
};

class HTTPStatusError : public std::runtime_error {
 public:
  HTTPStatusError(long status_code, std::string body)
      : std::runtime_error("failior: request rejected (" + std::to_string(status_code) + ")"),
        status_code_(status_code),
        body_(std::move(body)) {}

  long status_code() const { return status_code_; }
  const std::string& body() const { return body_; }

 private:
  long status_code_;
  std::string body_;
};

std::string Rfc3339Now() {
  using namespace std::chrono;
  auto now = system_clock::now();
  auto secs = time_point_cast<seconds>(now);
  auto millis = duration_cast<milliseconds>(now - secs).count();
  std::time_t tt = system_clock::to_time_t(now);
  std::tm utc_tm{};
#ifdef _WIN32
  gmtime_s(&utc_tm, &tt);
#else
  gmtime_r(&tt, &utc_tm);
#endif
  char base[32];
  std::strftime(base, sizeof(base), "%Y-%m-%dT%H:%M:%S", &utc_tm);
  std::ostringstream out;
  out << base << "." << std::setw(3) << std::setfill('0') << millis << "Z";
  return out.str();
}

std::string JsonEscape(const std::string& input) {
  std::ostringstream out;
  for (const char c : input) {
    switch (c) {
      case '\\':
        out << "\\\\";
        break;
      case '"':
        out << "\\\"";
        break;
      case '\n':
        out << "\\n";
        break;
      case '\r':
        out << "\\r";
        break;
      case '\t':
        out << "\\t";
        break;
      default:
        out << c;
        break;
    }
  }
  return out.str();
}

class Graph;

class Tracker {
 public:
  explicit Tracker(Graph* graph) : graph_(graph) {}

  void Node(const std::string& node_id) {
    const std::string value = Trim(node_id);
    if (value.empty()) {
      throw std::invalid_argument("failior: node_id required");
    }
    nodes_.push_back(value);
  }

  void End(const std::exception* err = nullptr);

 private:
  static std::string Trim(const std::string& value) {
    const auto first = value.find_first_not_of(" \t\r\n");
    if (first == std::string::npos) {
      return "";
    }
    const auto last = value.find_last_not_of(" \t\r\n");
    return value.substr(first, last - first + 1);
  }

  Graph* graph_;
  std::vector<std::string> nodes_;
  bool ended_ = false;
};

class Graph {
 public:
  Graph(std::string graph_id, std::string base_url = "http://graph-ingress:8081",
        std::string ingress_key = "", long timeout_ms = 5000)
      : graph_id_(std::move(graph_id)),
        base_url_(TrimTrailingSlash(base_url)),
        ingress_key_(std::move(ingress_key)),
        timeout_ms_(timeout_ms) {
    if (Trim(graph_id_).empty()) {
      throw std::invalid_argument("failior: graph_id required");
    }
    if (Trim(base_url_).empty()) {
      throw std::invalid_argument("failior: base_url required");
    }
    graph_id_ = Trim(graph_id_);
    ingress_key_ = Trim(ingress_key_);
  }

  Tracker Track() { return Tracker(this); }

  void Inform(const std::string& node_id, const std::string& status, const std::string& message = "") {
    const std::string trimmed_status = Trim(status);
    if (trimmed_status != "ok" && trimmed_status != "error") {
      throw std::invalid_argument("failior: status must be ok or error");
    }
    Packet packet;
    packet.did_error = trimmed_status == "error";
    packet.packet_msg = message;
    packet.graph_id = graph_id_;
    packet.node_id_list = {node_id};
    packet.timestamp = Rfc3339Now();
    SendPacket(packet);
  }

  void InformUp(const std::string& node_id, const std::string& message = "") {
    Inform(node_id, "ok", message);
  }

  void InformError(const std::string& node_id, const std::string& message) {
    Inform(node_id, "error", message);
  }

  void SendPacket(const Packet& packet) {
    Packet normalized = NormalizePacket(packet);
    const std::string body = PacketToJson(normalized);
    std::string response_body;
    long status_code = 0;
    CURLcode code = PostJson(base_url_ + "/ingest", body, &status_code, &response_body);
    if (code != CURLE_OK) {
      throw std::runtime_error("failior: request failed");
    }
    if (status_code < 200 || status_code >= 300) {
      throw HTTPStatusError(status_code, response_body);
    }
  }

  PacketBatchResult SendPacketBatch(const std::vector<Packet>& packets, int workers = 8) {
    PacketBatchResult result;
    if (packets.empty()) {
      return result;
    }
    const int width = std::max(1, std::min(workers, static_cast<int>(packets.size())));
    std::atomic<std::size_t> index = 0;
    std::atomic<int> accepted = 0;
    std::mutex errors_mu;
    std::vector<std::string> errors;
    std::vector<std::thread> threads;
    threads.reserve(width);

    for (int i = 0; i < width; ++i) {
      threads.emplace_back([&]() {
        while (true) {
          const std::size_t current = index.fetch_add(1);
          if (current >= packets.size()) {
            return;
          }
          try {
            SendPacket(packets[current]);
            accepted.fetch_add(1);
          } catch (const std::exception& ex) {
            std::lock_guard<std::mutex> lock(errors_mu);
            errors.push_back(ex.what());
          }
        }
      });
    }
    for (auto& thread : threads) {
      thread.join();
    }
    result.accepted = accepted.load();
    result.failed = static_cast<int>(packets.size()) - result.accepted;
    result.errors = std::move(errors);
    return result;
  }

  const std::string& graph_id() const { return graph_id_; }

 private:
  static std::string Trim(const std::string& value) {
    const auto first = value.find_first_not_of(" \t\r\n");
    if (first == std::string::npos) {
      return "";
    }
    const auto last = value.find_last_not_of(" \t\r\n");
    return value.substr(first, last - first + 1);
  }

  static std::string TrimTrailingSlash(const std::string& value) {
    std::string out = Trim(value);
    while (!out.empty() && out.back() == '/') {
      out.pop_back();
    }
    return out;
  }

  Packet NormalizePacket(const Packet& packet) const {
    Packet out = packet;
    out.graph_id = Trim(out.graph_id.empty() ? graph_id_ : out.graph_id);
    if (out.graph_id.empty()) {
      throw std::invalid_argument("failior: graph_id required");
    }
    if (out.node_id_list.empty()) {
      throw std::invalid_argument("failior: node_id_list cannot be empty");
    }
    for (std::string& node : out.node_id_list) {
      node = Trim(node);
      if (node.empty()) {
        throw std::invalid_argument("failior: node_id_list contains empty node id");
      }
    }
    if (out.did_error && Trim(out.packet_msg).empty()) {
      throw std::invalid_argument("failior: packet_msg required when did_error is true");
    }
    if (Trim(out.timestamp).empty()) {
      out.timestamp = Rfc3339Now();
    }
    return out;
  }

  static size_t WriteCallback(char* ptr, size_t size, size_t nmemb, void* userdata) {
    const size_t total = size * nmemb;
    auto* out = static_cast<std::string*>(userdata);
    out->append(ptr, total);
    return total;
  }

  CURLcode PostJson(const std::string& url, const std::string& body, long* status_code, std::string* response_body) const {
    CURL* curl = curl_easy_init();
    if (curl == nullptr) {
      return CURLE_FAILED_INIT;
    }
    struct curl_slist* headers = nullptr;
    headers = curl_slist_append(headers, "Content-Type: application/json");
    if (!ingress_key_.empty()) {
      headers = curl_slist_append(headers, ("X-Ingress-Key: " + ingress_key_).c_str());
    }

    curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
    curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
    curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.c_str());
    curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout_ms_);
    curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback);
    curl_easy_setopt(curl, CURLOPT_WRITEDATA, response_body);

    CURLcode code = curl_easy_perform(curl);
    if (code == CURLE_OK) {
      curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, status_code);
    }

    curl_slist_free_all(headers);
    curl_easy_cleanup(curl);
    return code;
  }

  static std::string PacketToJson(const Packet& packet) {
    std::ostringstream out;
    out << "{";
    out << "\"did_error\":" << (packet.did_error ? "true" : "false") << ",";
    out << "\"packet_msg\":\"" << JsonEscape(packet.packet_msg) << "\",";
    out << "\"graph_id\":\"" << JsonEscape(packet.graph_id) << "\",";
    out << "\"node_id_list\":[";
    for (std::size_t i = 0; i < packet.node_id_list.size(); ++i) {
      if (i > 0) {
        out << ",";
      }
      out << "\"" << JsonEscape(packet.node_id_list[i]) << "\"";
    }
    out << "],";
    out << "\"timestamp\":\"" << JsonEscape(packet.timestamp) << "\"";
    out << "}";
    return out.str();
  }

  std::string graph_id_;
  std::string base_url_;
  std::string ingress_key_;
  long timeout_ms_;
};

void Tracker::End(const std::exception* err) {
  if (ended_) {
    return;
  }
  ended_ = true;
  if (nodes_.empty()) {
    return;
  }
  Packet packet;
  packet.did_error = err != nullptr;
  packet.packet_msg = err ? err->what() : "";
  packet.graph_id = graph_->graph_id();
  packet.node_id_list = nodes_;
  packet.timestamp = Rfc3339Now();
  graph_->SendPacket(packet);
}

}  // namespace failior
