Files
DLE/backend/services/ai-queue.js
2026-03-01 22:03:48 +03:00

311 lines
11 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* Copyright (c) 2024-2026 Тарабанов Александр Викторович
* All rights reserved.
*
* This software is proprietary and confidential.
* Unauthorized copying, modification, or distribution is prohibited.
*
* For licensing inquiries: info@hb3-accelerator.com
* Website: https://hb3-accelerator.com
* GitHub: https://github.com/VC-HB3-Accelerator
*/
const EventEmitter = require('events');
const logger = require('../utils/logger');
const axios = require('axios');
const ollamaConfig = require('./ollamaConfig');
const aiCache = require('./ai-cache');
const aiConfigService = require('./aiConfigService');
const { buildOllamaRequest } = require('../utils/ollamaRequestBuilder');
class AIQueue extends EventEmitter {
constructor() {
super();
const timeouts = ollamaConfig.getTimeouts();
this.queue = [];
this.isProcessing = false; // ✨ НОВОЕ: Флаг обработки
this.maxQueueSize = timeouts.queueMaxSize; // Из централизованных настроек
this.workerInterval = null; // ✨ НОВОЕ: Интервал worker
this.checkInterval = timeouts.queueInterval; // Интервал проверки очереди
this.stats = {
totalAdded: 0,
totalProcessed: 0,
totalFailed: 0,
avgResponseTime: 0,
lastProcessedAt: null,
initializedAt: Date.now()
};
}
// Добавление запроса в очередь
async addRequest(request, priority = 0) {
const requestId = Date.now() + Math.random();
const queueItem = {
id: requestId,
request,
priority,
status: 'queued',
timestamp: Date.now()
};
// Добавляем в очередь с учетом приоритета
this.queue.push(queueItem);
this.queue.sort((a, b) => b.priority - a.priority);
this.stats.totalAdded++;
logger.info(`[AIQueue] Добавлен запрос ${requestId} с приоритетом ${priority}. Очередь: ${this.queue.length}`);
// Эмитим событие о добавлении
this.emit('requestAdded', queueItem);
return requestId;
}
// Получение следующего запроса (без обработки)
getNextRequest() {
if (this.queue.length === 0) return null;
return this.queue.shift();
}
// Получение запроса по ID
getRequestById(requestId) {
return this.queue.find(item => item.id === requestId);
}
// Обновление статуса запроса
updateRequestStatus(requestId, status, result = null, error = null, responseTime = null) {
const item = this.queue.find(item => item.id === requestId);
if (!item) return false;
item.status = status;
item.result = result;
item.error = error;
item.responseTime = responseTime;
item.processedAt = Date.now();
if (status === 'completed') {
this.stats.totalProcessed++;
if (responseTime) {
this.updateAvgResponseTime(responseTime);
}
this.stats.lastProcessedAt = Date.now();
this.emit('requestCompleted', item);
} else if (status === 'failed') {
this.stats.totalFailed++;
this.stats.lastProcessedAt = Date.now();
this.emit('requestFailed', item);
}
return true;
}
// Обновление средней скорости ответа
updateAvgResponseTime(responseTime) {
const total = this.stats.totalProcessed;
this.stats.avgResponseTime =
(this.stats.avgResponseTime * (total - 1) + responseTime) / total;
}
// Получение статистики
getStats() {
return {
totalAdded: this.stats.totalAdded,
totalProcessed: this.stats.totalProcessed,
totalFailed: this.stats.totalFailed,
averageProcessingTime: this.stats.avgResponseTime,
currentQueueSize: this.queue.length,
lastProcessedAt: this.stats.lastProcessedAt,
uptime: Date.now() - this.stats.initializedAt
};
}
// Получение размера очереди
getQueueSize() {
return this.queue.length;
}
// Очистка очереди
clearQueue() {
const clearedCount = this.queue.length;
this.queue = [];
logger.info(`[AIQueue] Очередь очищена. Удалено запросов: ${clearedCount}`);
return clearedCount;
}
// Пауза/возобновление очереди
pause() {
this.isPaused = true;
}
resume() {
this.isPaused = false;
}
// Проверка статуса паузы
isQueuePaused() {
return this.isPaused;
}
// ✨ НОВОЕ: Добавление задачи с Promise (для ожидания результата)
async addTask(taskData) {
// Проверяем лимит очереди
if (this.queue.length >= this.maxQueueSize) {
throw new Error('Очередь переполнена');
}
const taskId = Date.now() + Math.random();
const queueItem = {
id: taskId,
request: taskData,
priority: 0, // Все задачи с одинаковым приоритетом (FIFO)
status: 'queued',
timestamp: Date.now()
};
this.queue.push(queueItem);
// Не сортируем - FIFO (First In First Out)
this.stats.totalAdded++;
logger.info(`[AIQueue] Задача ${taskId} добавлена. Очередь: ${this.queue.length}`);
this.emit('requestAdded', queueItem);
// Возвращаем Promise для ожидания результата
return new Promise((resolve, reject) => {
const timeouts = ollamaConfig.getTimeouts();
const timeout = setTimeout(() => {
reject(new Error('Queue timeout'));
}, timeouts.queueTimeout); // Централизованный таймаут очереди
this.once(`task_${taskId}_completed`, (result) => {
clearTimeout(timeout);
resolve(result.response);
});
this.once(`task_${taskId}_failed`, (error) => {
clearTimeout(timeout);
reject(new Error(error.message));
});
});
}
// ✨ НОВОЕ: Запуск автоматического worker
startWorker() {
if (this.workerInterval) {
logger.warn('[AIQueue] Worker уже запущен');
return;
}
logger.info('[AIQueue] 🚀 Запуск worker для обработки очереди...');
this.workerInterval = setInterval(() => {
this.processNextTask();
}, this.checkInterval); // Интервал из централизованных настроек
}
// ✨ НОВОЕ: Остановка worker
stopWorker() {
if (this.workerInterval) {
clearInterval(this.workerInterval);
this.workerInterval = null;
logger.info('[AIQueue] ⏹️ Worker остановлен');
}
}
// ✨ НОВОЕ: Обработка следующей задачи из очереди
async processNextTask() {
if (this.isProcessing) return;
const task = this.getNextRequest();
if (!task) return;
this.isProcessing = true;
const startTime = Date.now();
try {
logger.info(`[AIQueue] Обработка задачи ${task.id}`);
// 1. Проверяем кэш
const cacheKey = aiCache.generateKey(task.request.messages);
const cached = aiCache.get(cacheKey);
if (cached) {
logger.info(`[AIQueue] Cache HIT для задачи ${task.id}`);
const responseTime = Date.now() - startTime;
this.updateRequestStatus(task.id, 'completed', cached, null, responseTime);
this.emit(`task_${task.id}_completed`, { response: cached, fromCache: true });
return;
}
// 2. Загружаем параметры LLM и qwen из настроек
const llmParameters = task.request.llmParameters || await aiConfigService.getLLMParameters();
const qwenParameters = task.request.qwenParameters || await aiConfigService.getQwenSpecificParameters();
const ollamaConfig_data = await ollamaConfig.getConfigAsync();
// 3. Формируем тело запроса (используем утилиту)
const requestBody = buildOllamaRequest({
messages: task.request.messages,
model: task.request.model,
llmParameters: llmParameters,
qwenParameters: qwenParameters,
defaultModel: ollamaConfig_data.defaultModel,
tools: task.request.tools || null,
tool_choice: task.request.tool_choice || null,
stream: false
});
// 4. Вызываем Ollama API
const ollamaUrl = ollamaConfig.getBaseUrl();
const timeouts = ollamaConfig.getTimeouts();
logger.info(`[AIQueue] Отправка запроса в Ollama с параметрами:`, {
model: requestBody.model,
temperature: requestBody.temperature,
num_predict: requestBody.num_predict,
format: requestBody.format || 'не задан',
hasTools: !!requestBody.tools
});
const response = await axios.post(`${ollamaUrl}/api/chat`, requestBody, {
timeout: timeouts.ollamaChat
});
// Обработка function calls (если есть)
// ВАЖНО: Function calling в очереди не поддерживается, т.к. нужен userId
// Если ИИ запросил функции - возвращаем ответ без их выполнения
let result;
if (response.data.message.tool_calls && response.data.message.tool_calls.length > 0) {
logger.warn(`[AIQueue] ИИ запросил выполнение ${response.data.message.tool_calls.length} функций, но function calling в очереди не поддерживается`);
result = response.data.message.content || 'Функции не выполнены (не поддерживается в очереди)';
} else {
result = response.data.message.content;
}
const responseTime = Date.now() - startTime;
// 4. Сохраняем в кэш
aiCache.set(cacheKey, result);
// 5. Обновляем статус
this.updateRequestStatus(task.id, 'completed', result, null, responseTime);
this.emit(`task_${task.id}_completed`, { response: result, fromCache: false });
logger.info(`[AIQueue] ✅ Задача ${task.id} выполнена за ${responseTime}ms`);
} catch (error) {
logger.error(`[AIQueue] ❌ Ошибка задачи ${task.id}:`, error.message);
this.updateRequestStatus(task.id, 'failed', null, error.message);
this.emit(`task_${task.id}_failed`, { message: error.message });
} finally {
this.isProcessing = false;
}
}
}
module.exports = AIQueue;