/** * Copyright (c) 2024-2025 Тарабанов Александр Викторович * All rights reserved. * * This software is proprietary and confidential. * Unauthorized copying, modification, or distribution is prohibited. * * For licensing inquiries: info@hb3-accelerator.com * Website: https://hb3-accelerator.com * GitHub: https://github.com/VC-HB3-Accelerator */ const EventEmitter = require('events'); const logger = require('../utils/logger'); const axios = require('axios'); const ollamaConfig = require('./ollamaConfig'); const aiCache = require('./ai-cache'); const aiConfigService = require('./aiConfigService'); const { buildOllamaRequest } = require('../utils/ollamaRequestBuilder'); class AIQueue extends EventEmitter { constructor() { super(); const timeouts = ollamaConfig.getTimeouts(); this.queue = []; this.isProcessing = false; // ✨ НОВОЕ: Флаг обработки this.maxQueueSize = timeouts.queueMaxSize; // Из централизованных настроек this.workerInterval = null; // ✨ НОВОЕ: Интервал worker this.checkInterval = timeouts.queueInterval; // Интервал проверки очереди this.stats = { totalAdded: 0, totalProcessed: 0, totalFailed: 0, avgResponseTime: 0, lastProcessedAt: null, initializedAt: Date.now() }; } // Добавление запроса в очередь async addRequest(request, priority = 0) { const requestId = Date.now() + Math.random(); const queueItem = { id: requestId, request, priority, status: 'queued', timestamp: Date.now() }; // Добавляем в очередь с учетом приоритета this.queue.push(queueItem); this.queue.sort((a, b) => b.priority - a.priority); this.stats.totalAdded++; logger.info(`[AIQueue] Добавлен запрос ${requestId} с приоритетом ${priority}. Очередь: ${this.queue.length}`); // Эмитим событие о добавлении this.emit('requestAdded', queueItem); return requestId; } // Получение следующего запроса (без обработки) getNextRequest() { if (this.queue.length === 0) return null; return this.queue.shift(); } // Получение запроса по ID getRequestById(requestId) { return this.queue.find(item => item.id === requestId); } // Обновление статуса запроса updateRequestStatus(requestId, status, result = null, error = null, responseTime = null) { const item = this.queue.find(item => item.id === requestId); if (!item) return false; item.status = status; item.result = result; item.error = error; item.responseTime = responseTime; item.processedAt = Date.now(); if (status === 'completed') { this.stats.totalProcessed++; if (responseTime) { this.updateAvgResponseTime(responseTime); } this.stats.lastProcessedAt = Date.now(); this.emit('requestCompleted', item); } else if (status === 'failed') { this.stats.totalFailed++; this.stats.lastProcessedAt = Date.now(); this.emit('requestFailed', item); } return true; } // Обновление средней скорости ответа updateAvgResponseTime(responseTime) { const total = this.stats.totalProcessed; this.stats.avgResponseTime = (this.stats.avgResponseTime * (total - 1) + responseTime) / total; } // Получение статистики getStats() { return { totalAdded: this.stats.totalAdded, totalProcessed: this.stats.totalProcessed, totalFailed: this.stats.totalFailed, averageProcessingTime: this.stats.avgResponseTime, currentQueueSize: this.queue.length, lastProcessedAt: this.stats.lastProcessedAt, uptime: Date.now() - this.stats.initializedAt }; } // Получение размера очереди getQueueSize() { return this.queue.length; } // Очистка очереди clearQueue() { const clearedCount = this.queue.length; this.queue = []; logger.info(`[AIQueue] Очередь очищена. Удалено запросов: ${clearedCount}`); return clearedCount; } // Пауза/возобновление очереди pause() { this.isPaused = true; } resume() { this.isPaused = false; } // Проверка статуса паузы isQueuePaused() { return this.isPaused; } // ✨ НОВОЕ: Добавление задачи с Promise (для ожидания результата) async addTask(taskData) { // Проверяем лимит очереди if (this.queue.length >= this.maxQueueSize) { throw new Error('Очередь переполнена'); } const taskId = Date.now() + Math.random(); const queueItem = { id: taskId, request: taskData, priority: 0, // Все задачи с одинаковым приоритетом (FIFO) status: 'queued', timestamp: Date.now() }; this.queue.push(queueItem); // Не сортируем - FIFO (First In First Out) this.stats.totalAdded++; logger.info(`[AIQueue] Задача ${taskId} добавлена. Очередь: ${this.queue.length}`); this.emit('requestAdded', queueItem); // Возвращаем Promise для ожидания результата return new Promise((resolve, reject) => { const timeouts = ollamaConfig.getTimeouts(); const timeout = setTimeout(() => { reject(new Error('Queue timeout')); }, timeouts.queueTimeout); // Централизованный таймаут очереди this.once(`task_${taskId}_completed`, (result) => { clearTimeout(timeout); resolve(result.response); }); this.once(`task_${taskId}_failed`, (error) => { clearTimeout(timeout); reject(new Error(error.message)); }); }); } // ✨ НОВОЕ: Запуск автоматического worker startWorker() { if (this.workerInterval) { logger.warn('[AIQueue] Worker уже запущен'); return; } logger.info('[AIQueue] 🚀 Запуск worker для обработки очереди...'); this.workerInterval = setInterval(() => { this.processNextTask(); }, this.checkInterval); // Интервал из централизованных настроек } // ✨ НОВОЕ: Остановка worker stopWorker() { if (this.workerInterval) { clearInterval(this.workerInterval); this.workerInterval = null; logger.info('[AIQueue] ⏹️ Worker остановлен'); } } // ✨ НОВОЕ: Обработка следующей задачи из очереди async processNextTask() { if (this.isProcessing) return; const task = this.getNextRequest(); if (!task) return; this.isProcessing = true; const startTime = Date.now(); try { logger.info(`[AIQueue] Обработка задачи ${task.id}`); // 1. Проверяем кэш const cacheKey = aiCache.generateKey(task.request.messages); const cached = aiCache.get(cacheKey); if (cached) { logger.info(`[AIQueue] Cache HIT для задачи ${task.id}`); const responseTime = Date.now() - startTime; this.updateRequestStatus(task.id, 'completed', cached, null, responseTime); this.emit(`task_${task.id}_completed`, { response: cached, fromCache: true }); return; } // 2. Загружаем параметры LLM и qwen из настроек const llmParameters = task.request.llmParameters || await aiConfigService.getLLMParameters(); const qwenParameters = task.request.qwenParameters || await aiConfigService.getQwenSpecificParameters(); const ollamaConfig_data = await ollamaConfig.getConfigAsync(); // 3. Формируем тело запроса (используем утилиту) const requestBody = buildOllamaRequest({ messages: task.request.messages, model: task.request.model, llmParameters: llmParameters, qwenParameters: qwenParameters, defaultModel: ollamaConfig_data.defaultModel, tools: task.request.tools || null, tool_choice: task.request.tool_choice || null, stream: false }); // 4. Вызываем Ollama API const ollamaUrl = ollamaConfig.getBaseUrl(); const timeouts = ollamaConfig.getTimeouts(); logger.info(`[AIQueue] Отправка запроса в Ollama с параметрами:`, { model: requestBody.model, temperature: requestBody.temperature, num_predict: requestBody.num_predict, format: requestBody.format || 'не задан', hasTools: !!requestBody.tools }); const response = await axios.post(`${ollamaUrl}/api/chat`, requestBody, { timeout: timeouts.ollamaChat }); // Обработка function calls (если есть) // ВАЖНО: Function calling в очереди не поддерживается, т.к. нужен userId // Если ИИ запросил функции - возвращаем ответ без их выполнения let result; if (response.data.message.tool_calls && response.data.message.tool_calls.length > 0) { logger.warn(`[AIQueue] ИИ запросил выполнение ${response.data.message.tool_calls.length} функций, но function calling в очереди не поддерживается`); result = response.data.message.content || 'Функции не выполнены (не поддерживается в очереди)'; } else { result = response.data.message.content; } const responseTime = Date.now() - startTime; // 4. Сохраняем в кэш aiCache.set(cacheKey, result); // 5. Обновляем статус this.updateRequestStatus(task.id, 'completed', result, null, responseTime); this.emit(`task_${task.id}_completed`, { response: result, fromCache: false }); logger.info(`[AIQueue] ✅ Задача ${task.id} выполнена за ${responseTime}ms`); } catch (error) { logger.error(`[AIQueue] ❌ Ошибка задачи ${task.id}:`, error.message); this.updateRequestStatus(task.id, 'failed', null, error.message); this.emit(`task_${task.id}_failed`, { message: error.message }); } finally { this.isProcessing = false; } } } module.exports = AIQueue;