Предисловие
Данный проект был разработан для доступа к данным. Что бы lmstudio имела доступ к данным которые есть у пользователя. Так же там использовался lmstudio клиент о котором я писал раньше. Но такой прокси оказался слишком простым и мало подходил для моих задач. Но может кому его окажется достаточно. Это первая версия. Есть вторая. Вторая версия гораздо сложнее и способна выполнять некоторые действия. Принцип работы как mcp-сервисы в lmstudio. Но они работают не локально а по сети. Код в полном объеме представлен в конце. Так же там сервис для примера работы.
Введение
Современные большие языковые модели (LLM), такие как Gemma, Mistral, Llama или OpenAI GPT, способны анализировать текст и отвечать на запросы пользователя. Однако у них нет прямого доступа к локальным или корпоративным данным — например, к базе 1С, трекеру судов, корпоративной почте или локальным документам.
Скрипт lmstudio-relay.py решает эту задачу. Он работает как умный HTTP/WebSocket-прокси, который стоит между моделью LM Studio и пользователем, автоматически анализируя запросы и подключая внешние источники данных при необходимости.
Основная идея
Когда клиент (например, веб-интерфейс или мобильное приложение) отправляет запрос к LM Studio API (обычно POST /v1/chat/completions), этот прокси:
Перехватывает запрос.
Извлекает пользовательское сообщение.
Определяет, какие внешние источники (ресурсы) могут быть полезны для ответа.
Подгружает данные из этих источников (HTTP-запросы, файлы и т. д.).
Формирует расширенный запрос с дополненной контекстной информацией.
Перенаправляет его в LM Studio, чтобы модель ответила максимально информированно.
Возвращает результат пользователю.
Архитектура
Клиент (Web/Android)
│
▼
LMStudio Relay
(умный прокси-сервер)
│
├──> LM Studio API (модель LLM)
└──> Внешние сервисы (HTTP, файлы, БД и т.д.)
Ключевые компоненты кода
1. ResourceManager
Модуль управления источниками данных (resources.json).
Он загружает описание всех доступных ресурсов — от HTTP-сервисов до файловых хранилищ.
? Пример ресурса:
{
"resources": [
{
"name": "vessels_tracker",
"description": "Сервис отслеживания морских судов в реальном времени",
"type": "http",
"url": "http://127.0.0.1:12323/vessels/latest"
},
{
"name": "1c_sales",
"description": "Данные о продажах из 1С",
"type": "http",
"url": "http://127.0.0.1:12325/query"
}
}
ResourceManager:
Загружает файл ресурсов с поддержкой UTF-8 BOM.
Позволяет получать список ресурсов и описание каждого.
Выдает контент из ресурсов (HTTP-запросом или чтением файла).
2. RequestPreprocessor
Этот класс добавляет “интеллектуальный слой” между пользователем и моделью.
Он делает три вещи:
-
Определяет, какие ресурсы могут быть релевантны вопросу пользователя.
Для этого он сам обращается к LLM (через/v1/chat/completions) и просит её вернуть JSON-список нужных источников.? Пример запроса к модели:
{ "messages": [ {"role": "system", "content": "You are a resource selection assistant..."}, {"role": "user", "content": "Покажи мне последние письма от поставщика"} ] }? Пример ожидаемого ответа:
{ "selected_resources": ["email_service"], "reasoning": "User asked about recent emails" } Загружает данные из выбранных ресурсов (например, результаты поиска или данные из 1С).
-
Строит расширенный промпт для модели, добавляя эти данные как контекст:
User question: Покажи мне последние письма от поставщика Relevant information from available resources: --- email_service --- Content from email_service: [{"from": "supplier@domain.com", "subject": "Invoice"}]... --- Instructions --- Based on the information above, please answer the user's question directly.
3. Прокси-обработка запросов
Основная часть FastAPI-приложения (@app.api_route("/{full_path:path}")) перехватывает все HTTP-запросы:
Если запрос не связан с чатом, он просто перенаправляется.
-
Если это запрос к
v1/chat/completions, то:Извлекается сообщение пользователя.
Активируется препроцессор (выбор и загрузка ресурсов).
Формируется новый запрос с дополненным контекстом.
Запрос пересылается в LM Studio.
Ответ возвращается клиенту.
Также реализован WebSocket-прокси, чтобы поддерживать потоковые ответы (stream=True) — например, для чат-интерфейсов.
4. Файл resources.json
Файл ресурсов определяет, к каким источникам прокси может обращаться.
Например:
vessels_tracker— сервис трекинга судовchroma_vector_db— векторная база для поиска по embedding’ам1c_sales,1c_documents— API-интерфейсы 1Сfile_reader— чтение локальных файлов
Благодаря этому Relay может "обогащать" модель актуальными корпоративными данными.
Пример работы
? Пользователь вводит в LM Studio:
Покажи последние письма от поставщика и проверь, есть ли новые отгрузки.
Relay:
Понимает, что нужны данные из
email_serviceи1c_documents.-
Запрашивает:
Формирует объединённый контекст с этими данными.
Отправляет в LM Studio новый, обогащённый запрос.
Модель отвечает уже на основе актуальных данных, а не только своего внутреннего контекста.
Применение
Этот прокси можно использовать:
? В корпоративных чат-ботах, где LLM обращается к CRM/1С/БД.
? В системах анализа данных — подгрузка отчётов, email, логов.
? В информационных панелях, где AI-компонент объединяет данные из разных источников.
? В автоматизированных помощниках для отдела продаж, бухгалтерии или логистики.
Технические детали
Основан на FastAPI + httpx + websockets.
Поддерживает HTTP и WebSocket-прокси.
Логирует все операции с подробностями и сохраняет финальные запросы для аудита.
-
Настраивается через переменные окружения:
LMSTUDIO_TARGET_HOST— хост LM StudioLISTEN_PORT— порт, на котором слушает relayRESOURCES_FILE— путь к JSON с ресурсами
Вывод
Скрипт lmstudio-relay.py — это “умный посредник” между моделью LM Studio и внешними данными.
Он делает язык модели контекстно осведомлённым, подключая реальные сервисы и базы.
По сути, это шаг к AGI-архитектуре, где LLM становится не просто генератором текста, а центром принятия решений, взаимодействующим с инфраструктурой.
Код программы.
#!/usr/bin/env python3
import os
import asyncio
import logging
import json
import time
import datetime
from typing import Dict, List, Any, Optional
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect, Response
from fastapi.responses import PlainTextResponse
import httpx
import websockets
from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError
# --- config ---
LMSTUDIO_TARGET_HOST = os.getenv("LMSTUDIO_TARGET_HOST", "127.0.0.1")
LMSTUDIO_TARGET_PORT = int(os.getenv("LMSTUDIO_TARGET_PORT", "1234"))
LISTEN_HOST = os.getenv("LISTEN_HOST", "0.0.0.0")
LISTEN_PORT = int(os.getenv("LISTEN_PORT", "1235"))
TARGET_USE_HTTPS = os.getenv("TARGET_USE_HTTPS", "false").lower() in ("1", "true", "yes")
TARGET_USE_WSS = os.getenv("TARGET_USE_WSS", "false").lower() in ("1", "true", "yes")
PROXY_TIMEOUT = float(os.getenv("PROXY_TIMEOUT", "300.0"))
RESOURCES_FILE = os.getenv("RESOURCES_FILE", "resources.json")
# --- logging setup ---
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=getattr(logging, log_level, logging.INFO),
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s"
)
logger = logging.getLogger("lmstudio-relay")
app = FastAPI()
# --- Resource Management ---
class ResourceManager:
def __init__(self, resources_file: str):
self.resources_file = resources_file
self.resources = self.load_resources()
def load_resources(self) -> Dict[str, Any]:
try:
# Используем utf-8-sig для обработки BOM
with open(self.resources_file, 'r', encoding='utf-8-sig') as f:
data = json.load(f)
logger.info(f"Loaded {len(data.get('resources', []))} resources from {self.resources_file}")
return data
except FileNotFoundError:
logger.warning(f"Resources file {self.resources_file} not found.")
return {"resources": []}
except json.JSONDecodeError as e:
logger.error(f"Error parsing resources file {self.resources_file}: {e}")
return {"resources": []}
def get_resources_description(self) -> str:
if not self.resources.get("resources"):
return "No resources available."
desc = "Available resources:\n"
for r in self.resources["resources"]:
desc += f"- {r['name']}: {r.get('description','')}"
if 'url' in r:
desc += f" (URL: {r['url']})"
desc += "\n"
return desc
def get_resource_by_name(self, name: str) -> Optional[Dict[str, Any]]:
for resource in self.resources.get("resources", []):
if resource.get("name") == name:
return resource
return None
resource_manager = ResourceManager(RESOURCES_FILE)
# --- Preprocessor ---
class RequestPreprocessor:
def __init__(self, resource_manager: ResourceManager):
self.resource_manager = resource_manager
async def create_resource_selection_prompt(self, user_message: str) -> Dict[str, Any]:
resources_description = self.resource_manager.get_resources_description()
system_prompt = f"""
You are a resource selection assistant. Analyze the user's query and determine which of the available resources might be relevant for answering it.
{resources_description}
Return ONLY valid JSON without any additional text:
{{
"selected_resources": ["name1", "name2"],
"reasoning": "short reason"
}}
"""
return {
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message}
],
"temperature": 0.1,
"max_tokens": 8000,
"stream": False
}
async def select_resources(self, user_message: str, target_base: str) -> List[str]:
try:
selection_prompt = await self.create_resource_selection_prompt(user_message)
url = f"{target_base}/v1/chat/completions"
logger.info(f"Selecting resources via {url}")
logger.debug(f"Selection prompt: {json.dumps(selection_prompt, ensure_ascii=False)[:500]}")
async with httpx.AsyncClient(timeout=PROXY_TIMEOUT) as client:
response = await client.post(url, json=selection_prompt)
logger.error(f"LMStudio response {response.status_code}: {response.text}")
# Логируем детали ошибки если есть
if response.status_code != 200:
error_detail = response.text
logger.error(f"LM Studio returned {response.status_code}: {error_detail}")
response.raise_for_status()
result = response.json()
content = result["choices"][0]["message"]["content"]
logger.debug(f"Raw selection response: {content}")
try:
# Пытаемся найти JSON в ответе если есть лишний текст
import re
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
content = json_match.group()
parsed = json.loads(content)
selected = parsed.get("selected_resources", [])
logger.info(f"Selected resources: {selected}")
return selected
except json.JSONDecodeError as e:
logger.error(f"Failed to parse resource selection JSON: {e}, content: {content}")
return []
except Exception as e:
logger.exception(f"Resource selection failed: {e}")
return []
async def fetch_resource_content(self, resource_name: str) -> str:
"""Реальная реализация получения контента из ресурса"""
resource = self.resource_manager.get_resource_by_name(resource_name)
if not resource:
return f"Resource '{resource_name}' not found"
try:
# Для HTTP ресурсов
if resource.get('type') == 'http' and 'url' in resource:
async with httpx.AsyncClient(timeout=60.0) as client:
# Определяем метод запроса на основе эндпоинта
method = resource.get('method', 'POST') # По умолчанию POST для vessels
# Для эндпоинтов /vessels/ используем POST с телом запроса
if '/vessels/' in resource['url']:
# Отправляем POST запрос с минимальным телом
post_data = resource.get('post_data', {})
response = await client.request(
method='POST', # Явно указываем POST
url=resource['url'],
json=post_data,
headers=resource.get('headers', {})
)
else:
# Для других эндпоинтов используем GET
response = await client.get(resource['url'])
response.raise_for_status()
content = response.json() if response.content else "No content"
return f"Content from {resource_name}: {json.dumps(content, ensure_ascii=False)[:1000]}..."
# Для файловых ресурсов
elif resource.get('type') == 'file' and 'path' in resource:
with open(resource['path'], 'r', encoding='utf-8') as f:
content = f.read()
return f"Content from {resource_name}: {content[:1000]}..."
else:
return f"Resource '{resource_name}': Description - {resource.get('description', 'No description')}"
except Exception as e:
logger.error(f"Error fetching resource {resource_name}: {e}")
return f"Error fetching resource '{resource_name}': {str(e)}"
async def build_enhanced_prompt(self, user_message: str, selected_resources: List[str]) -> str:
if not selected_resources:
return user_message
enhanced = f"User question: {user_message}\n\nRelevant information from available resources:\n"
# Параллельно получаем контент из всех выбранных ресурсов
tasks = [self.fetch_resource_content(resource) for resource in selected_resources]
results = await asyncio.gather(*tasks, return_exceptions=True)
resource_contents = []
for resource, content in zip(selected_resources, results):
if isinstance(content, Exception):
content = f"Error: {str(content)}"
# Ограничиваем длину контента и очищаем его
if len(str(content)) > 8000:
content = str(content)[:8000] + "... [truncated]"
resource_contents.append(f"--- {resource} ---\n{content}")
# Добавляем все ресурсы
enhanced += "\n\n".join(resource_contents)
# Четкая инструкция для модели
enhanced += f"\n\n--- Instructions ---\nBased on the information above, please answer the user's question directly and concisely."
logger.info(f"Enhanced prompt length: {len(enhanced)} characters")
return enhanced
preprocessor = RequestPreprocessor(resource_manager)
# --- Proxy helpers ---
def build_target_base():
scheme = "https" if TARGET_USE_HTTPS else "http"
return f"{scheme}://{LMSTUDIO_TARGET_HOST}:{LMSTUDIO_TARGET_PORT}"
def build_target_ws_base():
scheme = "wss" if TARGET_USE_WSS else "ws"
return f"{scheme}://{LMSTUDIO_TARGET_HOST}:{LMSTUDIO_TARGET_PORT}"
def safe_log_body(body: bytes) -> str:
"""Безопасное логирование тела запроса"""
try:
body_str = body.decode('utf-8', errors='replace')
return body_str[:2000]
except Exception:
return f"Binary data ({len(body)} bytes)"
# Hop-by-hop headers that shouldn't be forwarded
HOP_BY_HOP_HEADERS = {
"connection", "keep-alive", "proxy-authenticate", "proxy-authorization",
"te", "trailers", "transfer-encoding", "upgrade"
}
# --- Helper functions ---
async def should_preprocess(data: Dict[str, Any]) -> bool:
return "messages" in data and any(m.get("role") == "user" for m in data["messages"])
def extract_user_message(data: Dict[str, Any]) -> Optional[str]:
for m in reversed(data.get("messages", [])):
if m.get("role") == "user" and m.get("content"):
return m["content"]
return None
def modify_chat_request(data: Dict[str, Any], new_prompt: str) -> Dict[str, Any]:
"""Создаем чистый запрос с system сообщением и новым user сообщением"""
modified_data = json.loads(json.dumps(data)) # Deep copy
# Сохраняем system сообщение если есть
system_messages = [msg for msg in modified_data.get('messages', []) if msg.get('role') == 'system']
# Создаем новый clean контекст
new_messages = []
# Добавляем system сообщение если было
if system_messages:
new_messages.append(system_messages[0])
# Добавляем новое user сообщение с enhanced prompt
new_messages.append({
"role": "user",
"content": new_prompt
})
modified_data['messages'] = new_messages
# Убедимся, что структура корректна
if "model" not in modified_data:
modified_data["model"] = "google/gemma-3-12b"
if "max_tokens" not in modified_data:
modified_data["max_tokens"] = 8000
if "stream" not in modified_data:
modified_data["stream"] = False
logger.debug(f"Clean context created: {len(new_messages)} messages")
for i, msg in enumerate(new_messages):
logger.debug(f"Message {i}: role={msg.get('role')}, content_len={len(str(msg.get('content', '')))}")
return modified_data
# --- Proxy logic ---
@app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS", "HEAD"])
async def proxy_all(request: Request, full_path: str):
start_time = time.time()
client_host = getattr(request.client, "host", "unknown")
logger.info(f"Incoming {request.method} {full_path} from {client_host}")
# Route non-chat endpoints through simple proxy
if "v1/chat/completions" not in full_path:
return await simple_proxy(request, full_path)
body = await request.body()
try:
data = json.loads(body.decode("utf-8")) if body else {}
except Exception as e:
logger.warning(f"Failed to parse request body as JSON: {e}")
data = {}
# Preprocess chat requests if needed
if await should_preprocess(data):
user_msg = extract_user_message(data)
if user_msg:
logger.info("Preprocessing chat completion request...")
try:
selected = await preprocessor.select_resources(user_msg, build_target_base())
if selected:
new_prompt = await preprocessor.build_enhanced_prompt(user_msg, selected)
data = modify_chat_request(data, new_prompt)
body = json.dumps(data, ensure_ascii=False).encode("utf-8")
logger.info(f"Enhanced prompt built using {len(selected)} resources")
# ДЕТАЛЬНОЕ ЛОГИРОВАНИЕ ТОГО, ЧТО ОТПРАВЛЯЕТСЯ ВО ВТОРОЙ РАЗ
logger.debug("=== FINAL REQUEST TO LM STUDIO ===")
logger.debug(f"URL: {build_target_base()}/{full_path}")
logger.debug(f"Method: {request.method}")
logger.debug("=== REQUEST SUMMARY ===")
logger.debug(f"Model: {data.get('model')}")
logger.debug(f"Stream: {data.get('stream')}")
logger.debug(f"Max tokens: {data.get('max_tokens')}")
logger.debug(f"Temperature: {data.get('temperature')}")
logger.debug(f"Total messages: {len(data.get('messages', []))}")
for i, msg in enumerate(data.get('messages', [])):
role = msg.get('role', 'unknown')
content = msg.get('content', '')
logger.debug(f"Message {i} ({role}): {content[:200]}{'...' if len(content) > 200 else ''}")
logger.debug("=== END SUMMARY ===")
# Сохраняем полный запрос в файл
try:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"lmstudio_request_{timestamp}.json"
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"Full request saved to {filename}")
except Exception as e:
logger.error(f"Failed to save request: {e}")
else:
logger.info("No resources selected, using original prompt")
except Exception as e:
logger.error(f"Preprocessing failed: {e}")
# Продолжаем с оригинальным запросом в случае ошибки
target_url = f"{build_target_base()}/{full_path}"
logger.info(f"Proxying request -> {target_url}")
logger.debug(f"Body: {safe_log_body(body)}")
try:
async with httpx.AsyncClient(timeout=PROXY_TIMEOUT) as client:
# Build headers for upstream request
upstream_headers = {k: v for k, v in request.headers.items()
if k.lower() not in HOP_BY_HOP_HEADERS}
upstream_headers.pop("content-length", None)
# Логируем фактически отправляемые данные
logger.error("=== ACTUAL REQUEST BEING SENT TO LM STUDIO ===")
logger.error(f"Request body size: {len(body)} bytes")
logger.error(f"First 1000 chars of body: {body.decode('utf-8', errors='replace')[:1000]}")
logger.error("=== END ACTUAL REQUEST ===")
resp = await client.request(
request.method,
target_url,
headers=upstream_headers,
content=body
)
duration = time.time() - start_time
logger.info(f"Received {resp.status_code} from LMStudio in {duration:.2f}s")
# Prepare response headers
response_headers = {k: v for k, v in resp.headers.items()
if k.lower() not in HOP_BY_HOP_HEADERS}
response_headers.pop("content-length", None)
media_type = resp.headers.get("content-type")
return Response(content=resp.content, status_code=resp.status_code,
headers=response_headers, media_type=media_type)
except Exception as e:
logger.exception(f"Proxy error to {target_url}: {e}")
return PlainTextResponse("Upstream request failed", status_code=502)
async def simple_proxy(request: Request, full_path: str):
start = time.time()
target_url = f"{build_target_base()}/{full_path}"
client_host = getattr(request.client, "host", "unknown")
logger.info(f"Simple proxy {request.method} from {client_host} -> {target_url}")
body = await request.body()
logger.debug(f"Simple proxy body: {safe_log_body(body)}")
try:
async with httpx.AsyncClient(timeout=PROXY_TIMEOUT) as client:
upstream_headers = {k: v for k, v in request.headers.items()
if k.lower() not in HOP_BY_HOP_HEADERS}
upstream_headers.pop("content-length", None)
resp = await client.request(request.method, target_url, headers=upstream_headers, content=body)
logger.info(f"Simple proxy done {resp.status_code} ({time.time()-start:.2f}s)")
response_headers = {k: v for k, v in resp.headers.items()
if k.lower() not in HOP_BY_HOP_HEADERS}
response_headers.pop("content-length", None)
media_type = resp.headers.get("content-type")
return Response(content=resp.content, status_code=resp.status_code,
headers=response_headers, media_type=media_type)
except Exception as e:
logger.exception(f"Error proxying {target_url}: {e}")
return PlainTextResponse("Upstream failed", status_code=502)
# --- WebSocket proxy ---
@app.websocket_route("/{full_path:path}")
async def websocket_proxy(ws: WebSocket, full_path: str):
await ws.accept()
query = ws.scope.get("query_string", b"").decode()
target_ws_url = f"{build_target_ws_base()}/{full_path}"
if query:
target_ws_url += "?" + query
client_host = getattr(ws.client, "host", "unknown")
logger.info(f"WS proxy {client_host} -> {target_ws_url}")
try:
async with websockets.connect(target_ws_url) as upstream:
async def from_client():
while True:
data = await ws.receive_text()
logger.debug(f"WS <- client: {data[:200]}")
await upstream.send(data)
async def from_upstream():
async for msg in upstream:
logger.debug(f"WS -> client: {str(msg)[:200]}")
await ws.send_text(msg)
await asyncio.gather(from_client(), from_upstream())
except (ConnectionClosedOK, ConnectionClosedError):
logger.info("Upstream websocket closed")
await ws.close()
except Exception as e:
logger.exception(f"WebSocket error: {e}")
await ws.close()
if __name__ == "__main__":
import uvicorn
logger.info(f"Starting LMStudio relay -> {build_target_base()} (listen {LISTEN_HOST}:{LISTEN_PORT})")
uvicorn.run(app, host=LISTEN_HOST, port=LISTEN_PORT, log_level="info", access_log=True)
Код сервиса для примера и понимания
#!/usr/bin/env python3
import pyodbc
import os
from datetime import datetime
from fastapi import FastAPI, HTTPException
import uvicorn
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
# Конфигурация
PORT = 12323
HOST = "0.0.0.0"
app = FastAPI(title="Marine Vessels Tracker Service", description="Сервис для отслеживания морских судов")
class VesselsRequest(BaseModel):
vessel_name: Optional[str] = None
min_speed: Optional[float] = None
max_speed: Optional[float] = None
limit: Optional[int] = 50
class RadiusRequest(BaseModel):
center_lat: float
center_lon: float
radius_km: float = 10.0
class VesselsResponse(BaseModel):
prompt: str
vessels_count: int
timestamp: str
class DatabaseService:
"""Сервис для работы с базой данных судов"""
def __init__(self):
self.server = ""
self.database = ""
self.username = ""
self.password = ""
def get_connection(self):
"""Создание подключения к базе данных"""
try:
connection_string = f"""
DRIVER={{ODBC Driver 18 for SQL Server}};
SERVER={self.server};
DATABASE={self.database};
UID={self.username};
PWD={self.password};
TrustServerCertificate=yes;
"""
return pyodbc.connect(connection_string)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ошибка подключения к БД: {str(e)}")
def get_latest_vessels_positions(self, vessel_name: Optional[str] = None,
min_speed: Optional[float] = None,
max_speed: Optional[float] = None,
limit: int = 50) -> List[Dict[str, Any]]:
"""Получение последних позиций судов"""
conn = self.get_connection()
try:
cursor = conn.cursor()
# Базовый SQL запрос
sql_query = """
SELECT NAME, SPD, LAT, LON, DT
FROM [new].[DBO].[base_data]
WHERE ID = 1
"""
parameters = []
# Добавляем фильтры если указаны
if vessel_name:
sql_query += " AND NAME LIKE ?"
parameters.append(f"%{vessel_name}%")
if min_speed is not None:
sql_query += " AND SPD >= ?"
parameters.append(min_speed)
if max_speed is not None:
sql_query += " AND SPD <= ?"
parameters.append(max_speed)
# Сортируем по времени и ограничиваем количество
sql_query += " ORDER BY DT DESC"
if limit:
sql_query += f" OFFSET 0 ROWS FETCH NEXT {limit} ROWS ONLY"
# Выполняем запрос
cursor.execute(sql_query, parameters)
results = cursor.fetchall()
# Форматируем результат
vessels = []
for ves_name, spd, lat, lon, dt in results:
vessel_data = {
"name": ves_name,
"speed": float(spd) if spd is not None else 0.0,
"latitude": float(lat) if lat is not None else 0.0,
"longitude": float(lon) if lon is not None else 0.0,
"timestamp": str(dt) if dt else "Неизвестно"
}
vessels.append(vessel_data)
return vessels
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ошибка выполнения запроса: {str(e)}")
finally:
conn.close()
def find_vessels_in_radius(self, center_lat: float, center_lon: float, radius_km: float = 10.0) -> List[Dict[str, Any]]:
"""Поиск судов в радиусе от указанных координат"""
conn = self.get_connection()
try:
cursor = conn.cursor()
# SQL запрос с расчетом расстояния
sql_query = """
SELECT
NAME, SPD, LAT, LON, DT,
6371 * 2 * ASIN(SQRT(
POWER(SIN((RADIANS(?) - RADIANS(LAT))/2), 2) +
COS(RADIANS(LAT)) * COS(RADIANS(?)) *
POWER(SIN((RADIANS(?) - RADIANS(LON))/2), 2)
)) as distance_km
FROM [PRTN].[DBO].[POSP]
WHERE ID = 1
AND 6371 * 2 * ASIN(SQRT(
POWER(SIN((RADIANS(?) - RADIANS(LAT))/2), 2) +
COS(RADIANS(LAT)) * COS(RADIANS(?)) *
POWER(SIN((RADIANS(?) - RADIANS(LON))/2), 2)
)) <= ?
ORDER BY distance_km
"""
parameters = [center_lat, center_lat, center_lon, center_lat, center_lat, center_lon, radius_km]
cursor.execute(sql_query, parameters)
results = cursor.fetchall()
# Форматируем результат
vessels = []
for ves_name, spd, lat, lon, dt, distance in results:
vessel_data = {
"name": name,
"speed": float(spd) if spd is not None else 0.0,
"latitude": float(lat) if lat is not None else 0.0,
"longitude": float(lon) if lon is not None else 0.0,
"timestamp": str(dt) if dt else "Неизвестно",
"distance_km": float(distance) if distance is not None else 0.0
}
vessels.append(vessel_data)
return vessels
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ошибка поиска в радиусе: {str(e)}")
finally:
conn.close()
# Инициализация сервиса
db_service = DatabaseService()
def create_vessels_prompt(vessels: List[Dict[str, Any]], request_type: str = "latest") -> str:
"""Создание промпта из данных о судах"""
if not vessels:
return "Не найдено судов по указанным критериям."
if request_type == "latest":
prompt = "ПОСЛЕДНИЕ ПОЗИЦИИ МОРСКИХ СУДОВ\n\n"
prompt += f"Всего судов: {len(vessels)}\n"
prompt += f"Время запроса: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
for i, vessel in enumerate(vessels, 1):
# Определяем статус по скорости
speed = vessel["speed"]
if speed == 0:
status = "На якоре"
elif speed < 5:
status = "Медленно"
elif speed < 15:
status = "Обычная"
else:
status = "Быстро"
prompt += f"СУДНО #{i}\n"
prompt += f" Название: {vessel['name']}\n"
prompt += f" Скорость: {speed} узлов ({status})\n"
prompt += f" Координаты: {vessel['latitude']:.6f}°N, {vessel['longitude']:.6f}°E\n"
prompt += f" Время позиции: {vessel['timestamp']}\n"
prompt += f" Ссылка на карты: https://www.google.com/maps?q={vessel['latitude']},{vessel['longitude']}\n\n"
# Добавляем статистику
speeds = [v["speed"] for v in vessels]
prompt += "СТАТИСТИКА:\n"
prompt += f" Всего судов: {len(vessels)}\n"
prompt += f" Средняя скорость: {sum(speeds)/len(speeds):.1f} узлов\n"
prompt += f" Минимальная скорость: {min(speeds):.1f} узлов\n"
prompt += f" Максимальная скорость: {max(speeds):.1f} узлов\n"
prompt += f" На якоре (0 узлов): {sum(1 for s in speeds if s == 0)} судов\n\n"
elif request_type == "radius":
prompt = f"СУДА В РАДИУСЕ {vessels[0]['radius_km'] if vessels else 0} КМ\n\n"
prompt += f"Центр: {vessels[0]['center_lat'] if vessels else 0:.6f}°N, {vessels[0]['center_lon'] if vessels else 0:.6f}°E\n"
prompt += f"Найдено судов: {len(vessels)}\n\n"
for i, vessel in enumerate(vessels, 1):
prompt += f"#{i}: {vessel['name']}\n"
prompt += f" Расстояние: {vessel['distance_km']:.1f} км\n"
prompt += f" Координаты: {vessel['latitude']:.6f}°N, {vessel['longitude']:.6f}°E\n"
prompt += f" Скорость: {vessel['speed']} узлов\n"
prompt += f" Время: {vessel['timestamp']}\n\n"
prompt += "Используй эти данные для:\n"
prompt += " - Анализа движения судов\n"
prompt += " - Построения маршрутов\n"
prompt += " - Поиска конкретных судов\n"
prompt += " - Мониторинга морской активности\n"
return prompt
@app.post("/vessels/latest", response_model=VesselsResponse)
async def get_latest_vessels_positions(request: VesselsRequest):
"""
Получение последних позиций судов
"""
try:
print(f"? Запрос позиций судов: name={request.vessel_name}, speed={request.min_speed}-{request.max_speed}")
vessels = db_service.get_latest_vessels_positions(
vessel_name=request.vessel_name,
min_speed=request.min_speed,
max_speed=request.max_speed,
limit=request.limit
)
prompt = create_vessels_prompt(vessels, "latest")
response = VesselsResponse(
prompt=prompt,
vessels_count=len(vessels),
timestamp=datetime.now().isoformat()
)
print(f"✅ Возвращено {len(vessels)} судов")
return response
except HTTPException:
raise
except Exception as e:
print(f"❌ Ошибка при получении позиций судов: {e}")
raise HTTPException(status_code=500, detail=f"Ошибка получения данных: {str(e)}")
@app.post("/vessels/radius")
async def find_vessels_in_radius(request: RadiusRequest):
"""
Поиск судов в радиусе от указанных координат
"""
try:
print(f"? Поиск судов в радиусе {request.radius_km}км от ({request.center_lat}, {request.center_lon})")
vessels = db_service.find_vessels_in_radius(
center_lat=request.center_lat,
center_lon=request.center_lon,
radius_km=request.radius_km
)
# Добавляем информацию о радиусе в данные судов
for vessel in vessels:
vessel["radius_km"] = request.radius_km
vessel["center_lat"] = request.center_lat
vessel["center_lon"] = request.center_lon
prompt = create_vessels_prompt(vessels, "radius")
response = VesselsResponse(
prompt=prompt,
vessels_count=len(vessels),
timestamp=datetime.now().isoformat()
)
print(f"✅ Найдено {len(vessels)} судов в радиусе")
return response
except HTTPException:
raise
except Exception as e:
print(f"❌ Ошибка при поиске судов в радиусе: {e}")
raise HTTPException(status_code=500, detail=f"Ошибка поиска: {str(e)}")
@app.get("/health")
async def health_check():
"""Проверка здоровья сервиса"""
try:
# Пробуем получить одно судно для проверки подключения
test_vessels = db_service.get_latest_vessels_positions(limit=1)
return {
"status": "healthy",
"service": "Marine Vessels Tracker",
"database_accessible": len(test_vessels) >= 0,
"server": db_service.server,
"database": db_service.database
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Сервис не здоров: {str(e)}")
@app.get("/")
async def root():
"""Корневой эндпоинт с информацией о сервисе"""
return {
"service": "Marine Vessels Tracker Service",
"version": "1.0",
"description": "Сервис для отслеживания позиций морских судов в реальном времени",
"endpoints": {
"POST /vessels/latest": "Получение последних позиций судов",
"POST /vessels/radius": "Поиск судов в радиусе от координат",
"GET /health": "Проверка здоровья сервиса"
},
"config": {
"database_server": db_service.server,
"database_name": db_service.database,
"default_limit": 50
}
}
if __name__ == "__main__":
print(f"? Запуск Marine Vessels Tracker Service на {HOST}:{PORT}")
print(f"?️ База данных: {db_service.server}/{db_service.database}")
print("? Доступные эндпоинты:")
print(" POST /vessels/latest - последние позиции судов")
print(" POST /vessels/radius - поиск судов в радиусе")
print(" GET /health - проверка здоровья сервиса")
print(" GET / - информация о сервисе")
uvicorn.run(app, host=HOST, port=PORT, log_level="info")
fuCtor
Код все же лучше в git держать и дать ссылку. Так проще будет его копировать и видеть актуальную версию.
stilrambler Автор
Согласен обычно я так и делаю. Но я не уверен что это кому либо надо. И продолжать это я не буду поэтому и выложил в таком виде. У меня вторая версия она намного интереснее.
Andreas_Fogel
В чем отличие от mcp серверов, которые как раз можно в lmstudio добавлять?
stilrambler Автор
Об этом сказано в тексте. Они обрабатываются в локальных запросах. Сетевые запросы не обрабатываются. Возможно в будущих версиях будет по другому. Так же данный прокси легко настроить на оламму где такой обработки в принципе нет.