Files
DLE/backend/services/ai-queue.js

377 lines
12 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* Copyright (c) 2024-2025 Тарабанов Александр Викторович
* All rights reserved.
*
* This software is proprietary and confidential.
* Unauthorized copying, modification, or distribution is prohibited.
*
* For licensing inquiries: info@hb3-accelerator.com
* Website: https://hb3-accelerator.com
* GitHub: https://github.com/HB3-ACCELERATOR
*/
const Queue = require('better-queue');
const logger = require('../utils/logger');
class AIQueueService {
constructor() {
this.queue = null;
this.isInitialized = false;
this.userRequestTimes = new Map(); // Добавляем Map для отслеживания запросов пользователей
this.stats = {
totalProcessed: 0,
totalFailed: 0,
averageProcessingTime: 0,
currentQueueSize: 0,
lastProcessedAt: null
};
this.initQueue();
}
initQueue() {
try {
this.queue = new Queue(this.processTask.bind(this), {
// Ограничиваем количество одновременных запросов к Ollama
concurrent: 2,
// Максимальное время выполнения задачи
maxTimeout: 180000, // 3 минуты
// Задержка между задачами для предотвращения перегрузки
afterProcessDelay: 1000, // 1 секунда
// Максимальное количество повторных попыток
maxRetries: 2,
// Задержка между повторными попытками
retryDelay: 5000, // 5 секунд
// Функция определения приоритета
priority: this.getTaskPriority.bind(this),
// Функция фильтрации задач
filter: this.filterTask.bind(this),
// Функция слияния одинаковых задач
merge: this.mergeTasks.bind(this),
// ID задачи для предотвращения дублирования
id: 'requestId'
});
this.setupEventListeners();
this.isInitialized = true;
logger.info('[AIQueue] Queue initialized successfully');
} catch (error) {
logger.error('[AIQueue] Failed to initialize queue:', error);
this.isInitialized = false;
}
}
// Определение приоритета задачи
getTaskPriority(task, cb) {
try {
let priority = 1; // Базовый приоритет
// Высокий приоритет для администраторов
if (task.userRole === 'admin') {
priority += 10;
}
// Приоритет по типу запроса
switch (task.type) {
case 'urgent':
priority += 20;
break;
case 'chat':
priority += 5;
break;
case 'analysis':
priority += 3;
break;
case 'generation':
priority += 1;
break;
}
// Приоритет по размеру запроса (короткие запросы имеют больший приоритет)
if (task.message && task.message.length < 100) {
priority += 2;
}
// Приоритет по времени ожидания
const waitTime = Date.now() - task.timestamp;
if (waitTime > 30000) { // Более 30 секунд ожидания
priority += 5;
}
cb(null, priority);
} catch (error) {
cb(error, 1);
}
}
// Фильтрация задач
filterTask(task, cb) {
try {
// Проверяем обязательные поля
if (!task.message || typeof task.message !== 'string') {
return cb('Invalid message format');
}
if (!task.requestId) {
return cb('Missing request ID');
}
// Проверяем размер сообщения
if (task.message.length > 10000) {
return cb('Message too long (max 10000 characters)');
}
// Проверяем частоту запросов от пользователя
if (this.isUserRateLimited(task.userId)) {
return cb('User rate limit exceeded');
}
cb(null, task);
} catch (error) {
cb(error);
}
}
// Слияние одинаковых задач
mergeTasks(oldTask, newTask, cb) {
try {
// Если это тот же запрос от того же пользователя, обновляем метаданные
if (oldTask.message === newTask.message && oldTask.userId === newTask.userId) {
oldTask.timestamp = newTask.timestamp;
oldTask.retryCount = (oldTask.retryCount || 0) + 1;
cb(null, oldTask);
} else {
cb(null, newTask);
}
} catch (error) {
cb(error);
}
}
// Обработка задачи
async processTask(task, cb) {
const startTime = Date.now();
const taskId = task.requestId;
try {
logger.info(`[AIQueue] Processing task ${taskId} for user ${task.userId}`);
// Импортируем AI сервис
const aiAssistant = require('./ai-assistant');
const encryptedDb = require('./encryptedDatabaseService');
// Выполняем AI запрос
const result = await aiAssistant.getResponse(
task.message,
task.language || 'auto',
task.history || null,
task.systemPrompt || '',
task.rules || null
);
const processingTime = Date.now() - startTime;
// Сохраняем AI ответ в базу данных
if (task.conversationId && result) {
try {
const aiMessage = await encryptedDb.saveData('messages', {
conversation_id: task.conversationId,
user_id: task.userId,
content: result,
sender_type: 'assistant',
role: 'assistant',
channel: 'web'
});
// Получаем расшифрованные данные для WebSocket
const decryptedAiMessage = await encryptedDb.getData('messages', { id: aiMessage.id }, 1);
if (decryptedAiMessage && decryptedAiMessage[0]) {
// Отправляем сообщение через WebSocket
const { broadcastChatMessage } = require('../wsHub');
broadcastChatMessage(decryptedAiMessage[0], task.userId);
}
logger.info(`[AIQueue] AI response saved for conversation ${task.conversationId}`);
} catch (dbError) {
logger.error(`[AIQueue] Error saving AI response:`, dbError);
}
}
// Обновляем статистику
this.updateStats(true, processingTime);
logger.info(`[AIQueue] Task ${taskId} completed in ${processingTime}ms`);
cb(null, {
success: true,
result,
processingTime,
taskId
});
} catch (error) {
const processingTime = Date.now() - startTime;
// Обновляем статистику
this.updateStats(false, processingTime);
logger.error(`[AIQueue] Task ${taskId} failed:`, error);
cb(null, {
success: false,
error: error.message,
processingTime,
taskId
});
}
}
// Добавление задачи в очередь
addTask(taskData) {
if (!this.isInitialized || !this.queue) {
throw new Error('Queue is not initialized');
}
const task = {
...taskData,
timestamp: Date.now(),
requestId: taskData.requestId || `req_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
};
return new Promise((resolve, reject) => {
const ticket = this.queue.push(task, (error, result) => {
if (error) {
reject(error);
} else {
resolve(result);
}
});
// Добавляем обработчики событий для билета
ticket.on('failed', (error) => {
logger.error(`[AIQueue] Task ${task.requestId} failed:`, error);
reject(error);
});
ticket.on('finish', (result) => {
logger.info(`[AIQueue] Task ${task.requestId} finished`);
resolve(result);
});
});
}
// Настройка обработчиков событий очереди
setupEventListeners() {
this.queue.on('task_queued', (taskId) => {
logger.info(`[AIQueue] Task ${taskId} queued`);
this.stats.currentQueueSize = this.queue.length;
});
this.queue.on('task_started', (taskId) => {
logger.info(`[AIQueue] Task ${taskId} started`);
});
this.queue.on('task_finish', (taskId, result) => {
logger.info(`[AIQueue] Task ${taskId} finished successfully`);
this.stats.lastProcessedAt = new Date();
this.stats.currentQueueSize = this.queue.length;
});
this.queue.on('task_failed', (taskId, error) => {
logger.error(`[AIQueue] Task ${taskId} failed:`, error);
this.stats.currentQueueSize = this.queue.length;
});
this.queue.on('empty', () => {
logger.info('[AIQueue] Queue is empty');
this.stats.currentQueueSize = 0;
});
this.queue.on('drain', () => {
logger.info('[AIQueue] Queue drained');
this.stats.currentQueueSize = 0;
});
}
// Обновление статистики
updateStats(success, processingTime) {
this.stats.totalProcessed++;
if (!success) {
this.stats.totalFailed++;
}
// Обновляем среднее время обработки
const totalTime = this.stats.averageProcessingTime * (this.stats.totalProcessed - 1) + processingTime;
this.stats.averageProcessingTime = totalTime / this.stats.totalProcessed;
}
// Проверка ограничения частоты запросов пользователя
isUserRateLimited(userId) {
// Простая реализация - можно улучшить с использованием Redis
const now = Date.now();
const userRequests = this.userRequestTimes.get(userId) || [];
// Удаляем старые запросы (старше 1 минуты)
const recentRequests = userRequests.filter(time => now - time < 60000);
// Ограничиваем до 10 запросов в минуту
if (recentRequests.length >= 10) {
return true;
}
// Добавляем текущий запрос
recentRequests.push(now);
this.userRequestTimes.set(userId, recentRequests);
return false;
}
// Получение статистики очереди
getStats() {
const queueStats = this.queue ? this.queue.getStats() : {};
return {
...this.stats,
queueStats,
isInitialized: this.isInitialized,
currentQueueSize: this.queue ? this.queue.length : 0,
runningTasks: this.queue ? this.queue.running : 0
};
}
// Очистка очереди
clear() {
if (this.queue) {
this.queue.destroy();
this.initQueue();
}
}
// Пауза/возобновление очереди
pause() {
if (this.queue) {
this.queue.pause();
logger.info('[AIQueue] Queue paused');
}
}
resume() {
if (this.queue) {
this.queue.resume();
logger.info('[AIQueue] Queue resumed');
}
}
}
// Создаем и экспортируем единственный экземпляр
const aiQueueService = new AIQueueService();
module.exports = aiQueueService;