Files
DLE/backend/services/telegramBot.js

490 lines
23 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* Copyright (c) 2024-2025 Тарабанов Александр Викторович
* All rights reserved.
*
* This software is proprietary and confidential.
* Unauthorized copying, modification, or distribution is prohibited.
*
* For licensing inquiries: info@hb3-accelerator.com
* Website: https://hb3-accelerator.com
* GitHub: https://github.com/HB3-ACCELERATOR
*/
const { Telegraf } = require('telegraf');
const logger = require('../utils/logger');
const db = require('../db');
const authService = require('./auth-service');
const verificationService = require('./verification-service');
const crypto = require('crypto');
const identityService = require('./identity-service');
const aiAssistant = require('./ai-assistant');
const { checkAdminRole } = require('./admin-role');
const { broadcastContactsUpdate } = require('../wsHub');
const aiAssistantSettingsService = require('./aiAssistantSettingsService');
const { ragAnswer, generateLLMResponse } = require('./ragService');
const { isUserBlocked } = require('../utils/userUtils');
let botInstance = null;
let telegramSettingsCache = null;
async function getTelegramSettings() {
if (telegramSettingsCache) return telegramSettingsCache;
const { rows } = await db.getQuery()('SELECT * FROM telegram_settings ORDER BY id LIMIT 1');
if (!rows.length) throw new Error('Telegram settings not found in DB');
telegramSettingsCache = rows[0];
return telegramSettingsCache;
}
// Создание и настройка бота
async function getBot() {
if (!botInstance) {
const settings = await getTelegramSettings();
botInstance = new Telegraf(settings.bot_token);
// Обработка команды /start
botInstance.command('start', (ctx) => {
ctx.reply('Добро пожаловать! Отправьте код подтверждения для аутентификации.');
});
// Универсальный обработчик текстовых сообщений
botInstance.on('text', async (ctx) => {
const text = ctx.message.text.trim();
// 1. Если команда — пропустить
if (text.startsWith('/')) return;
// 2. Проверка: это потенциальный код?
const isPotentialCode = (str) => /^[A-Z0-9]{6}$/i.test(str);
if (isPotentialCode(text)) {
try {
// Получаем код верификации для всех активных кодов с провайдером telegram
const codeResult = await db.getQuery()(
`SELECT * FROM verification_codes
WHERE code = $1
AND provider = 'telegram'
AND used = false
AND expires_at > NOW()`,
[text.toUpperCase()]
);
if (codeResult.rows.length === 0) {
ctx.reply('Неверный код подтверждения');
return;
}
const verification = codeResult.rows[0];
const providerId = verification.provider_id;
const linkedUserId = verification.user_id; // Получаем связанный userId если он есть
let userId;
let userRole = 'user'; // Роль по умолчанию
// Отмечаем код как использованный
await db.getQuery()('UPDATE verification_codes SET used = true WHERE id = $1', [
verification.id,
]);
logger.info('Starting Telegram auth process for code:', text);
// Проверяем, существует ли уже пользователь с таким Telegram ID
const existingTelegramUser = await db.getQuery()(
`SELECT ui.user_id
FROM user_identities ui
WHERE ui.provider = 'telegram' AND ui.provider_id = $1`,
[ctx.from.id.toString()]
);
if (existingTelegramUser.rows.length > 0) {
// Если пользователь с таким Telegram ID уже существует, используем его
userId = existingTelegramUser.rows[0].user_id;
logger.info(`Using existing user ${userId} for Telegram account ${ctx.from.id}`);
} else {
// Если код верификации был связан с существующим пользователем, используем его
if (linkedUserId) {
// Используем userId из кода верификации
userId = linkedUserId;
// Связываем Telegram с этим пользователем
await db.getQuery()(
`INSERT INTO user_identities
(user_id, provider, provider_id, created_at)
VALUES ($1, $2, $3, NOW())`,
[userId, 'telegram', ctx.from.id.toString()]
);
logger.info(
`Linked Telegram account ${ctx.from.id} to pre-authenticated user ${userId}`
);
} else {
// Проверяем, есть ли пользователь, связанный с гостевым идентификатором
let existingUserWithGuestId = null;
if (providerId) {
const guestUserResult = await db.getQuery()(
`SELECT user_id FROM guest_user_mapping WHERE guest_id = $1`,
[providerId]
);
if (guestUserResult.rows.length > 0) {
existingUserWithGuestId = guestUserResult.rows[0].user_id;
logger.info(
`Found existing user ${existingUserWithGuestId} by guest ID ${providerId}`
);
}
}
if (existingUserWithGuestId) {
// Используем существующего пользователя и добавляем ему Telegram идентификатор
userId = existingUserWithGuestId;
await db.getQuery()(
`INSERT INTO user_identities
(user_id, provider, provider_id, created_at)
VALUES ($1, $2, $3, NOW())`,
[userId, 'telegram', ctx.from.id.toString()]
);
logger.info(`Linked Telegram account ${ctx.from.id} to existing user ${userId}`);
} else {
// Создаем нового пользователя, если не нашли существующего
const userResult = await db.getQuery()(
'INSERT INTO users (created_at, role) VALUES (NOW(), $1) RETURNING id',
['user']
);
userId = userResult.rows[0].id;
// Связываем Telegram с новым пользователем
await db.getQuery()(
`INSERT INTO user_identities
(user_id, provider, provider_id, created_at)
VALUES ($1, $2, $3, NOW())`,
[userId, 'telegram', ctx.from.id.toString()]
);
// Если был гостевой ID, связываем его с новым пользователем
if (providerId) {
await db.getQuery()(
`INSERT INTO guest_user_mapping
(user_id, guest_id)
VALUES ($1, $2)
ON CONFLICT (guest_id) DO UPDATE SET user_id = $1`,
[userId, providerId]
);
}
logger.info(`Created new user ${userId} with Telegram account ${ctx.from.id}`);
}
}
}
// ----> НАЧАЛО: Проверка роли на основе привязанного кошелька <----
if (userId) { // Убедимся, что userId определен
logger.info(`[TelegramBot] Checking linked wallet for determined userId: ${userId} (Type: ${typeof userId})`);
try {
const linkedWallet = await authService.getLinkedWallet(userId);
if (linkedWallet) {
logger.info(`[TelegramBot] Found linked wallet ${linkedWallet} for user ${userId}. Checking role...`);
const isAdmin = await checkAdminRole(linkedWallet);
userRole = isAdmin ? 'admin' : 'user';
logger.info(`[TelegramBot] Role for user ${userId} determined as: ${userRole}`);
// Опционально: Обновить роль в таблице users
const currentUser = await db.getQuery()('SELECT role FROM users WHERE id = $1', [userId]);
if (currentUser.rows.length > 0 && currentUser.rows[0].role !== userRole) {
await db.getQuery()('UPDATE users SET role = $1 WHERE id = $2', [userRole, userId]);
logger.info(`[TelegramBot] Updated user role in DB to ${userRole}`);
}
} else {
logger.info(`[TelegramBot] No linked wallet found for user ${userId}. Checking current DB role.`);
// Если кошелька нет, берем текущую роль из базы
const currentUser = await db.getQuery()('SELECT role FROM users WHERE id = $1', [userId]);
if (currentUser.rows.length > 0) {
userRole = currentUser.rows[0].role;
}
}
} catch (roleCheckError) {
logger.error(`[TelegramBot] Error checking admin role for user ${userId}:`, roleCheckError);
// В случае ошибки берем роль из базы или оставляем 'user'
try {
const currentUser = await db.getQuery()('SELECT role FROM users WHERE id = $1', [userId]);
if (currentUser.rows.length > 0) { userRole = currentUser.rows[0].role; }
} catch (dbError) { /* ignore */ }
}
} else {
logger.error('[TelegramBot] Cannot check role because userId is undefined!');
}
// ----> КОНЕЦ: Проверка роли <----
// Логируем userId перед обновлением сессии
logger.info(`[telegramBot] Attempting to update session for userId: ${userId}`);
// Находим последнюю активную сессию для данного userId
let activeSessionId = null;
try {
// Ищем сессию, где есть userId и она не истекла (проверка expires_at)
// Сортируем по expires_at DESC чтобы взять самую "свежую", если их несколько
const sessionResult = await db.getQuery()(
`SELECT sid FROM session
WHERE sess ->> 'userId' = $1
AND expire > NOW()
ORDER BY expire DESC
LIMIT 1`,
[userId?.toString()] // Используем optional chaining и преобразуем в строку
);
if (sessionResult.rows.length > 0) {
activeSessionId = sessionResult.rows[0].sid;
logger.info(`[telegramBot] Found active session ID ${activeSessionId} for user ${userId}`);
// Обновляем найденную сессию в базе данных, добавляя/перезаписывая данные Telegram
const updateResult = await db.getQuery()(
`UPDATE session
SET sess = (sess::jsonb || $1::jsonb)::json
WHERE sid = $2`,
[
JSON.stringify({
// authenticated: true, // Не перезаписываем, т.к. сессия уже должна быть аутентифицирована
authType: 'telegram', // Обновляем тип аутентификации
telegramId: ctx.from.id.toString(),
telegramUsername: ctx.from.username,
telegramFirstName: ctx.from.first_name,
role: userRole, // Записываем определенную роль
// userId: userId?.toString() // userId уже должен быть в сессии
}),
activeSessionId // Обновляем по найденному session ID
]
);
if (updateResult.rowCount > 0) {
logger.info(`[telegramBot] Session ${activeSessionId} updated successfully with Telegram data for user ${userId}`);
} else {
logger.warn(`[telegramBot] Session update query executed but did not update rows for sid: ${activeSessionId}. This might indicate a concurrency issue or incorrect sid.`);
}
} else {
logger.warn(`[telegramBot] No active web session found for userId: ${userId}. Telegram is linked, but the user might need to refresh their browser session.`);
}
} catch(sessionError) {
logger.error(`[telegramBot] Error finding or updating session for userId ${userId}:`, sessionError);
}
// Отправляем сообщение об успешной аутентификации
await ctx.reply('Аутентификация успешна! Можете вернуться в приложение.');
// Удаляем сообщение с кодом
try {
await ctx.deleteMessage(ctx.message.message_id);
} catch (error) {
logger.warn('Could not delete code message:', error);
}
// После каждого успешного создания пользователя:
broadcastContactsUpdate();
} catch (error) {
logger.error('Error in Telegram auth:', error);
await ctx.reply('Произошла ошибка при аутентификации. Попробуйте позже.');
}
return;
}
// 3. Всё остальное — чат с ИИ-ассистентом
try {
const telegramId = ctx.from.id.toString();
// 1. Найти или создать пользователя
const { userId, role } = await identityService.findOrCreateUserWithRole('telegram', telegramId);
if (await isUserBlocked(userId)) {
await ctx.reply('Вы заблокированы. Сообщения не принимаются.');
return;
}
// 1.1 Найти или создать беседу
let conversationResult = await db.getQuery()(
'SELECT * FROM conversations WHERE user_id = $1 ORDER BY updated_at DESC, created_at DESC LIMIT 1',
[userId]
);
let conversation;
if (conversationResult.rows.length === 0) {
const title = `Чат с пользователем ${userId}`;
const newConv = await db.getQuery()(
'INSERT INTO conversations (user_id, title, created_at, updated_at) VALUES ($1, $2, NOW(), NOW()) RETURNING *',
[userId, title]
);
conversation = newConv.rows[0];
} else {
conversation = conversationResult.rows[0];
}
// 2. Сохранять все сообщения с conversation_id
let content = text;
let attachmentMeta = {};
// Проверяем вложения (фото, документ, аудио, видео)
let fileId, fileName, mimeType, fileSize, attachmentBuffer;
if (ctx.message.document) {
fileId = ctx.message.document.file_id;
fileName = ctx.message.document.file_name;
mimeType = ctx.message.document.mime_type;
fileSize = ctx.message.document.file_size;
} else if (ctx.message.photo && ctx.message.photo.length > 0) {
// Берём самое большое фото
const photo = ctx.message.photo[ctx.message.photo.length - 1];
fileId = photo.file_id;
fileName = 'photo.jpg';
mimeType = 'image/jpeg';
fileSize = photo.file_size;
} else if (ctx.message.audio) {
fileId = ctx.message.audio.file_id;
fileName = ctx.message.audio.file_name || 'audio.ogg';
mimeType = ctx.message.audio.mime_type || 'audio/ogg';
fileSize = ctx.message.audio.file_size;
} else if (ctx.message.video) {
fileId = ctx.message.video.file_id;
fileName = ctx.message.video.file_name || 'video.mp4';
mimeType = ctx.message.video.mime_type || 'video/mp4';
fileSize = ctx.message.video.file_size;
}
if (fileId) {
// Скачиваем файл
const fileLink = await ctx.telegram.getFileLink(fileId);
const res = await fetch(fileLink.href);
attachmentBuffer = await res.buffer();
attachmentMeta = {
attachment_filename: fileName,
attachment_mimetype: mimeType,
attachment_size: fileSize,
attachment_data: attachmentBuffer
};
}
// Сохраняем сообщение в БД
await db.getQuery()(
`INSERT INTO messages (user_id, conversation_id, sender_type, content, channel, role, direction, created_at, attachment_filename, attachment_mimetype, attachment_size, attachment_data)
VALUES ($1, $2, $3, $4, $5, $6, $7, NOW(), $8, $9, $10, $11)`,
[userId, conversation.id, 'user', content, 'telegram', role, 'in',
attachmentMeta.attachment_filename || null,
attachmentMeta.attachment_mimetype || null,
attachmentMeta.attachment_size || null,
attachmentMeta.attachment_data || null
]
);
if (await isUserBlocked(userId)) {
logger.info(`[TelegramBot] Пользователь ${userId} заблокирован — ответ ИИ не отправляется.`);
return;
}
// 3. Получить ответ от ИИ (RAG + LLM)
const aiSettings = await aiAssistantSettingsService.getSettings();
let ragTableId = null;
if (aiSettings && aiSettings.selected_rag_tables) {
ragTableId = Array.isArray(aiSettings.selected_rag_tables)
? aiSettings.selected_rag_tables[0]
: aiSettings.selected_rag_tables;
}
let aiResponse;
if (ragTableId) {
// Сначала ищем ответ через RAG
const ragResult = await ragAnswer({ tableId: ragTableId, userQuestion: content });
if (ragResult && ragResult.answer && typeof ragResult.score === 'number' && Math.abs(ragResult.score) <= 0.3) {
aiResponse = ragResult.answer;
} else {
aiResponse = await generateLLMResponse({
userQuestion: content,
context: ragResult && ragResult.context ? ragResult.context : '',
answer: ragResult && ragResult.answer ? ragResult.answer : '',
systemPrompt: aiSettings ? aiSettings.system_prompt : '',
history: null,
model: aiSettings ? aiSettings.model : undefined,
language: aiSettings && aiSettings.languages && aiSettings.languages.length > 0 ? aiSettings.languages[0] : 'ru'
});
}
} else {
aiResponse = await aiAssistant.getResponse(content, 'auto');
}
// 4. Сохранить ответ в БД с conversation_id
await db.getQuery()(
`INSERT INTO messages (user_id, conversation_id, sender_type, content, channel, role, direction, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())`,
[userId, conversation.id, 'assistant', aiResponse, 'telegram', role, 'out']
);
// 5. Отправить ответ пользователю
await ctx.reply(aiResponse);
} catch (error) {
logger.error('[TelegramBot] Ошибка при обработке сообщения:', error);
await ctx.reply('Произошла ошибка при обработке вашего сообщения. Попробуйте позже.');
}
});
// Запуск бота
await botInstance.launch();
logger.info('[TelegramBot] Бот запущен');
}
return botInstance;
}
// Остановка бота
async function stopBot() {
if (botInstance) {
try {
await botInstance.stop();
botInstance = null;
logger.info('Telegram bot stopped successfully');
} catch (error) {
logger.error('Error stopping Telegram bot:', error);
throw error;
}
}
}
// Инициализация процесса аутентификации
async function initTelegramAuth(session) {
try {
// Используем временный идентификатор для создания кода верификации
// Реальный пользователь будет создан или найден при проверке кода через бота
const tempId = crypto.randomBytes(16).toString('hex');
// Если пользователь уже авторизован, сохраняем его userId в guest_user_mapping
// чтобы потом при авторизации через бота этот пользователь был найден
if (session && session.authenticated && session.userId) {
const guestId = session.guestId || tempId;
// Связываем гостевой ID с текущим пользователем
await db.getQuery()(
`INSERT INTO guest_user_mapping (user_id, guest_id)
VALUES ($1, $2)
ON CONFLICT (guest_id) DO UPDATE SET user_id = $1`,
[session.userId, guestId]
);
logger.info(
`[initTelegramAuth] Linked guestId ${guestId} to authenticated user ${session.userId}`
);
}
// Создаем код через сервис верификации с идентификатором
const code = await verificationService.createVerificationCode(
'telegram',
session.guestId || tempId,
session.authenticated ? session.userId : null
);
logger.info(
`[initTelegramAuth] Created verification code for guestId: ${session.guestId || tempId}${session.authenticated ? `, userId: ${session.userId}` : ''}`
);
const settings = await getTelegramSettings();
return {
verificationCode: code,
botLink: `https://t.me/${settings.bot_username}`,
};
} catch (error) {
logger.error('Error initializing Telegram auth:', error);
throw error;
}
}
function clearSettingsCache() {
telegramSettingsCache = null;
}
async function getAllBots() {
const { rows } = await db.getQuery()('SELECT id, bot_username FROM telegram_settings ORDER BY id');
return rows;
}
module.exports = {
getTelegramSettings,
getBot,
stopBot,
initTelegramAuth,
clearSettingsCache,
getAllBots,
};