feat: новая функция
This commit is contained in:
@@ -12,11 +12,20 @@
|
||||
|
||||
const EventEmitter = require('events');
|
||||
const logger = require('../utils/logger');
|
||||
const axios = require('axios');
|
||||
const ollamaConfig = require('./ollamaConfig');
|
||||
const aiCache = require('./ai-cache');
|
||||
|
||||
class AIQueue extends EventEmitter {
|
||||
constructor() {
|
||||
super();
|
||||
const timeouts = ollamaConfig.getTimeouts();
|
||||
|
||||
this.queue = [];
|
||||
this.isProcessing = false; // ✨ НОВОЕ: Флаг обработки
|
||||
this.maxQueueSize = timeouts.queueMaxSize; // Из централизованных настроек
|
||||
this.workerInterval = null; // ✨ НОВОЕ: Интервал worker
|
||||
this.checkInterval = timeouts.queueInterval; // Интервал проверки очереди
|
||||
this.stats = {
|
||||
totalAdded: 0,
|
||||
totalProcessed: 0,
|
||||
@@ -135,6 +144,133 @@ class AIQueue extends EventEmitter {
|
||||
isQueuePaused() {
|
||||
return this.isPaused;
|
||||
}
|
||||
|
||||
// ✨ НОВОЕ: Добавление задачи с Promise (для ожидания результата)
|
||||
async addTask(taskData) {
|
||||
// Проверяем лимит очереди
|
||||
if (this.queue.length >= this.maxQueueSize) {
|
||||
throw new Error('Очередь переполнена');
|
||||
}
|
||||
|
||||
const taskId = Date.now() + Math.random();
|
||||
|
||||
const queueItem = {
|
||||
id: taskId,
|
||||
request: taskData,
|
||||
priority: 0, // Все задачи с одинаковым приоритетом (FIFO)
|
||||
status: 'queued',
|
||||
timestamp: Date.now()
|
||||
};
|
||||
|
||||
this.queue.push(queueItem);
|
||||
// Не сортируем - FIFO (First In First Out)
|
||||
this.stats.totalAdded++;
|
||||
|
||||
logger.info(`[AIQueue] Задача ${taskId} добавлена. Очередь: ${this.queue.length}`);
|
||||
this.emit('requestAdded', queueItem);
|
||||
|
||||
// Возвращаем Promise для ожидания результата
|
||||
return new Promise((resolve, reject) => {
|
||||
const timeouts = ollamaConfig.getTimeouts();
|
||||
const timeout = setTimeout(() => {
|
||||
reject(new Error('Queue timeout'));
|
||||
}, timeouts.queueTimeout); // Централизованный таймаут очереди
|
||||
|
||||
this.once(`task_${taskId}_completed`, (result) => {
|
||||
clearTimeout(timeout);
|
||||
resolve(result.response);
|
||||
});
|
||||
|
||||
this.once(`task_${taskId}_failed`, (error) => {
|
||||
clearTimeout(timeout);
|
||||
reject(new Error(error.message));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// ✨ НОВОЕ: Запуск автоматического worker
|
||||
startWorker() {
|
||||
if (this.workerInterval) {
|
||||
logger.warn('[AIQueue] Worker уже запущен');
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info('[AIQueue] 🚀 Запуск worker для обработки очереди...');
|
||||
|
||||
this.workerInterval = setInterval(() => {
|
||||
this.processNextTask();
|
||||
}, this.checkInterval); // Интервал из централизованных настроек
|
||||
}
|
||||
|
||||
// ✨ НОВОЕ: Остановка worker
|
||||
stopWorker() {
|
||||
if (this.workerInterval) {
|
||||
clearInterval(this.workerInterval);
|
||||
this.workerInterval = null;
|
||||
logger.info('[AIQueue] ⏹️ Worker остановлен');
|
||||
}
|
||||
}
|
||||
|
||||
// ✨ НОВОЕ: Обработка следующей задачи из очереди
|
||||
async processNextTask() {
|
||||
if (this.isProcessing) return;
|
||||
|
||||
const task = this.getNextRequest();
|
||||
if (!task) return;
|
||||
|
||||
this.isProcessing = true;
|
||||
const startTime = Date.now();
|
||||
|
||||
try {
|
||||
logger.info(`[AIQueue] Обработка задачи ${task.id}`);
|
||||
|
||||
// 1. Проверяем кэш
|
||||
const cacheKey = aiCache.generateKey(task.request.messages);
|
||||
const cached = aiCache.get(cacheKey);
|
||||
|
||||
if (cached) {
|
||||
logger.info(`[AIQueue] Cache HIT для задачи ${task.id}`);
|
||||
const responseTime = Date.now() - startTime;
|
||||
|
||||
this.updateRequestStatus(task.id, 'completed', cached, null, responseTime);
|
||||
this.emit(`task_${task.id}_completed`, { response: cached, fromCache: true });
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. Вызываем Ollama API
|
||||
const ollamaUrl = ollamaConfig.getBaseUrl();
|
||||
const timeouts = ollamaConfig.getTimeouts();
|
||||
|
||||
const response = await axios.post(`${ollamaUrl}/api/chat`, {
|
||||
model: task.request.model || ollamaConfig.getDefaultModel(),
|
||||
messages: task.request.messages,
|
||||
stream: false
|
||||
}, {
|
||||
timeout: timeouts.ollamaChat
|
||||
});
|
||||
|
||||
const result = response.data.message.content;
|
||||
const responseTime = Date.now() - startTime;
|
||||
|
||||
// 3. Сохраняем в кэш
|
||||
aiCache.set(cacheKey, result);
|
||||
|
||||
// 4. Обновляем статус
|
||||
this.updateRequestStatus(task.id, 'completed', result, null, responseTime);
|
||||
this.emit(`task_${task.id}_completed`, { response: result, fromCache: false });
|
||||
|
||||
logger.info(`[AIQueue] ✅ Задача ${task.id} выполнена за ${responseTime}ms`);
|
||||
|
||||
} catch (error) {
|
||||
logger.error(`[AIQueue] ❌ Ошибка задачи ${task.id}:`, error.message);
|
||||
|
||||
this.updateRequestStatus(task.id, 'failed', null, error.message);
|
||||
this.emit(`task_${task.id}_failed`, { message: error.message });
|
||||
|
||||
} finally {
|
||||
this.isProcessing = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = AIQueue;
|
||||
Reference in New Issue
Block a user