const express = require("express"); const cors = require("cors"); const WebSocket = require("ws"); const app = express(); const PORT_HTTP = 4005; const PORT_WS = 4006; const PUSHGATEWAY = "http://pushgateway:9091"; // --- HTTP server --- app.use(cors({ origin: "*", methods: ["GET", "POST"], allowedHeaders: ["Content-Type"], credentials: true })); app.use(express.json({ limit: "50mb" })); app.post("/push", (req, res) => { const metric = req.body; if (metric) pushToProm(metric); res.send("ok"); }); // --- WS server --- const wss = new WebSocket.Server({ port: PORT_WS }); wss.on("connection", ws => { console.log("New WS connection. Online users:", wss.clients.size); pushOnlineCount(); ws.on("message", msg => { try { const m = JSON.parse(msg); pushToProm(m); } catch (e) { console.error("Invalid WS message:", e); } }); ws.on("close", () => pushOnlineCount()); }); // --- Push metrics to Pushgateway --- async function pushToProm(metric) { if (!metric || !metric.type) return; const lines = []; let job = "webapp"; // default job let groupingKey = {}; switch (metric.type) { case "frontend_metrics": job = "webapp_frontend"; lines.push(`# HELP frontend_load_time Frontend page load time in ms`); lines.push(`# TYPE frontend_load_time gauge`); lines.push(`frontend_load_time{instance="${metric.id || 'default'}"} ${metric.navigation?.loadEventEnd || 0}`); if (metric.memory?.usedJSHeapSize) { lines.push(`# HELP frontend_memory_used JS heap used in MB`); lines.push(`# TYPE frontend_memory_used gauge`); lines.push(`frontend_memory_used{instance="${metric.id || 'default'}"} ${metric.memory.usedJSHeapSize / 1024 / 1024}`); } if (metric.resources?.length) { lines.push(`# HELP frontend_resource_count Number of resources loaded`); lines.push(`# TYPE frontend_resource_count gauge`); lines.push(`frontend_resource_count{instance="${metric.id || 'default'}"} ${metric.resources.length}`); } break; case "rest": job = "webapp_api"; lines.push(`# HELP rest_request_duration_ms REST request duration in ms`); lines.push(`# TYPE rest_request_duration_ms gauge`); lines.push(`rest_request_duration_ms{path="${metric.path}",status="${metric.status}"} ${metric.time}`); break; case "ws_in": job = "webapp_ws"; lines.push(`# HELP ws_in_bytes_total WS bytes in`); lines.push(`# TYPE ws_in_bytes_total counter`); lines.push(`ws_in_bytes_total ${metric.length}`); break; case "ws_out": job = "webapp_ws"; lines.push(`# HELP ws_out_bytes_total WS bytes out`); lines.push(`# TYPE ws_out_bytes_total counter`); lines.push(`ws_out_bytes_total ${metric.length}`); break; case "connection_status": job = "webapp_connection"; lines.push(`# HELP users_online Users online`); lines.push(`# TYPE users_online gauge`); lines.push(`users_online ${metric.count || wss.clients.size}`); const putBody = lines.join("\n") + "\n"; // Отправляем как PUT для Gauge, чтобы сбросить старые значения return await pushToPrometheus(`${PUSHGATEWAY}/metrics/job/${job}`, putBody, "PUT"); } const body = lines.join("\n") + "\n"; console.log(`Pushing metric for job=${job}:\n${body}\n`); // Используем POST по умолчанию для инкремента/добавления await pushToPrometheus(`${PUSHGATEWAY}/metrics/job/${job}`, body, "POST"); } async function pushToPrometheus(url, body, method) { try { const response = await fetch(url, { method: method, headers: { "Content-Type": "text/plain" }, body }); if (response.status !== 200) { console.error(`PushGateway error (${method} ${url}):`, response.status, await response.text()); } } catch (err) { console.error("PushGateway network error:", err.message); } } function pushOnlineCount() { pushToProm({ type: "connection_status", count: wss.clients.size }); } // --- Start HTTP server --- app.listen(PORT_HTTP, () => console.log("Metrics HTTP listening on", PORT_HTTP)); console.log("Metrics WS listening on", PORT_WS); // Запуск интервала после инициализации wss setInterval(pushOnlineCount, 5000);