Мнение.

Основные проблемы в корпоративном IT это, как и прежде: инфраструктура, безопасность и работа с данными.

AI и, так называемые, Агенты AI, в этой сфере, в ближайшие 2-3 года, мало что поменяют.

В корпоративном секторе столько неэффективности и реакционности, что буст продуктивности, который принесет AI станет каплей в море.

Миграция с Oracle на Postgresql или переезд с Lotus Domino, для большинства крупных не-IT компаний принесет больше пользы, чем внедрение AI, здесь и сейчас.

Без современной инфраструктуры и стека данных, внедрение AI не отобьет OPEX и тем более CAPEX.

Единственное революционное измерение, которые стоит ожидать уже в этом году – западные корпорации будут отказывается от аутотренинг, особенно в Индии.

Вот кстати пример, того как можно автоматизировать работу с данными.

Представим себе, что нам на почту присылают письма с вложенными csv, json и xml файлами. Скрипт, который запускается по расписанию и ищет письма с словом «documents» в теме и вложенным файлом в формате csv. Сохраняет эти файлы в папку.

import imaplib
import email
from email.header import decode_header
import os
import csv

IMAP_SERVER = "imap.yandex.com"
EMAIL = "email@yandex.com"
PASSWORD = "password"
DOWNLOAD_FOLDER = "attachments"

if not os.path.exists(DOWNLOAD_FOLDER):
    os.makedirs(DOWNLOAD_FOLDER)

def connect_to_imap():
    try:
        print(f"Connecting to IMAP server: {IMAP_SERVER}")
        mail = imaplib.IMAP4_SSL(IMAP_SERVER, port=993)
        mail.login(EMAIL, PASSWORD)
        mail.select("INBOX")
        print("Connected successfully.")
        return mail
    except imaplib.IMAP4.error as e:
        print(f"IMAP error: {e}")
        print("Check your email and password, or enable app passwords if 2FA is enabled.")
        return None
    except Exception as e:
        print(f"Unexpected error: {e}")
        return None

def decode_header_value(encoded_value):
    decoded_parts = decode_header(encoded_value)
    result = ""
    for part, encoding in decoded_parts:
        if isinstance(part, bytes):
            try:
                result += part.decode(encoding or "utf-8")
            except LookupError:
                result += part.decode("latin-1")
            except UnicodeDecodeError:
                result += part.decode("utf-8", errors="ignore")
        else:
            result += part
    return result

def extract_csv_attachments(msg):
    for part in msg.walk():
        if part.get_content_maintype() == "multipart":
            continue
        if part.get("Content-Disposition") is None:
            continue

        filename = part.get_filename()
        if filename and filename.lower().endswith(".csv"):
            filename = decode_header_value(filename)
            print(f"Found CSV attachment: {filename}")

            filepath = os.path.join(DOWNLOAD_FOLDER, filename)
            with open(filepath, "wb") as f:
                f.write(part.get_payload(decode=True))
            print(f"Saved CSV attachment: {filename}")

def fetch_emails():
    mail = connect_to_imap()
    if not mail:
        return

    try:
        status, messages = mail.search(None, 'SUBJECT "documents"')
        if status != "OK":
            print("No emails found with 'documents' in the subject.")
            return

        email_ids = messages[0].split()
        print(f"Found {len(email_ids)} emails with 'documents' in the subject.")

        for mail_id in reversed(email_ids):
            status, data = mail.fetch(mail_id, "(RFC822)")
            if status != "OK":
                continue

            msg = email.message_from_bytes(data[0][1])
            subject = decode_header_value(msg["Subject"])
            print(f"Processing email: {subject}")

            extract_csv_attachments(msg)
    finally:
        mail.logout()

if __name__ == "__main__":
    fetch_emails()

Другой скрипт забирает csv и перекладывает их в duckDB – ультрабыстрая колоночная RDBMS, которая пользуется популярностью среди дата-аналитиков:

import os
import duckdb

DUCKDB_PATH = "my_database.duckdb"

ATTACHMENTS_DIR = "attachments"

def send_csv_to_duckdb():
    conn = duckdb.connect(DUCKDB_PATH)
    print(f"Connected to DuckDB database: {DUCKDB_PATH}")

    for filename in os.listdir(ATTACHMENTS_DIR):
        if filename.endswith(".csv"):
            filepath = os.path.join(ATTACHMENTS_DIR, filename)
            table_name = os.path.splitext(filename)[0]
            print(f"Processing file: {filename}")

            quoted_table_name = f'"{table_name}"'

            conn.execute(
                f"CREATE OR REPLACE TABLE {quoted_table_name} AS SELECT * FROM read_csv_auto('{filepath}')"
            )
            print(f"Data from {filename} sent to DuckDB table {quoted_table_name}.")

    print("All CSV files processed.")

if __name__ == "__main__":
    send_csv_to_duckdb()

Metabase – один из лучших инструментов с открытым исходным кодом для аналитики и визуализации данных.

docker run -d -p 3000:3000 -v "$(pwd)":/srv --name metabase metabase/metabase

Для работы с duckdb, необходимо установить плагин:

https://github.com/MotherDuck-Open-Source/metabase_duckdb_driver

Дальше в дело вступает Sling это инструмент для репликации между базами и различными типами хранилищ. Вместо нагромождения UI и микросервисов вроде Airbyte, эти ребята решили пойти по простому пути и написали rsync или rclone для работы с большими данными.

curl -LO 'https://github.com/slingdata-io/sling-cli/releases/latest/download/sling_linux_amd64.tar.gz' \
  && tar xf sling_linux_amd64.tar.gz \
  && rm -f sling_linux_amd64.tar.gz \
  && chmod +x sling

mv sling /usr/local/bin/
$ sling conns list
+--------------------------+-----------------+-------------------+
| CONN NAME                | CONN TYPE       | SOURCE            |
+--------------------------+-----------------+-------------------+
| AWS_S3                   | FileSys - S3    | sling env yaml    |
| DO_SPACES                | FileSys - S3    | sling env yaml    |
| LOCALHOST_DEV            | DB - PostgreSQL | dbt profiles yaml |
| MSSQL                    | DB - SQLServer  | sling env yaml    |
| MYSQL                    | DB - MySQL      | sling env yaml    |
| ORACLE_DB                | DB - Oracle     | env variable      |
| MY_PG                    | DB - PostgreSQL | sling env yaml    |
+--------------------------+-----------------+-------------------+

Дата пайплайн можно описать в простом YAML файле. В данном случае мы будем брать csv файл и перекладывать его в PostgreSQL и minio s3 для долгосрочного хранения:

version: 1
sources:
  duckdb_source:
    type: duckdb
    path: /path/to/your_database.duckdb

targets:
  minio_target:
    type: s3
    bucket: <bucket>
    access_key_id: <access_key_id>
    secret_access_key: '<secret_access_key>'
    endpoint: '<endpoint>'
    url_style: path

  postgres_target:
    type: postgres
    host: postgres_host
    port: 5432
    database: mydb
    username: user
    password: password

pipelines:
  - name: duckdb_to_minio
    source: duckdb_source
    target: minio_target
    query: "SELECT * FROM sales"
    mode: overwrite

  - name: duckdb_to_postgres
    source: duckdb_source
    target: postgres_target
    query: "SELECT * FROM sales"
    mode: overwrite

Ну и наконец AI. Куда же без него?
Ollama это очень популярный проект для работы с AI моделями локально, предоставляет не только CLI но и отличную библиотеку на Python.

Сначала нужно установить зависимости и скачать модельку:

pip install ollama graphvizollama run qwen2.5-coder:7b

Для своей весовой категории qwen2.5 это одна из лучших открытых LLM.

Скрипт с помощью qwen сделает краткое изложение csv на 10 гигабайт, а если надо нарисует графики и сконвертирует этот файл в нужный формат.

analyze_csv.py
analyze_csv.py

Все зависит от того, какой вы отправите промпт:

промпт
промпт

Скрипт:

#!/usr/bin/env python3
import csv
import ollama
import argparse
import logging
from typing import Optional

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

PROMPT_TEMPLATE = """[INST] <<SYS>>
You are a data analysis expert. Analyze this CSV data and:

1. Summarize the data
2. Identify trends or patterns
3. Provide insights and recommendations

Format response with:
- Markdown analysis
- Key insights
- Recommendations for further analysis
<</SYS>>

CSV Data:
{data}
[/INST]"""

def read_csv(file_path: str) -> str:
    try:
        with open(file_path, "r") as file:
            reader = csv.reader(file)
            rows = [",".join(row) for row in reader]
        return "\n".join(rows)
    except Exception as e:
        logging.error(f"Error reading CSV file: {e}")
        raise

def analyze_csv(csv_file: str, context_size: Optional[int] = None) -> str:
    try:
        logging.info(f"Reading CSV file: {csv_file}")
        csv_text = read_csv(csv_file)
        logging.info(f"Extracted CSV text: {csv_text[:100]}...")

        options = {}
        if context_size:
            options['max_tokens'] = context_size

        logging.info("Sending data to Ollama model...")
        response = ollama.chat(
            model='qwen2.5-coder:7b',
            messages=[{'role': 'user', 'content': PROMPT_TEMPLATE.format(data=csv_text)}],
            options=options
        )['message']['content']
        logging.info("Received response from Ollama model")

        return response

    except Exception as e:
        logging.error(f"An error occurred: {e}")
        raise

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='CSV AI Analyzer')
    parser.add_argument('csv_file', help='Input CSV file')
    parser.add_argument('--context-size', type=int, help='Optional context size (max tokens) for the Ollama model')
    args = parser.parse_args()

    try:
        report = analyze_csv(args.csv_file, args.context_size)
        print("\nAnalysis Report:")
        print(report)
    except Exception as e:
        logging.error(f"Script failed: {e}")

Комментарии (2)


  1. kewgen
    26.01.2025 13:11

    Если вам до сих пор на почту приходит csv файл, то AI уже не спасёт


    1. Rikimaru22 Автор
      26.01.2025 13:11

      Хорошая, кстати, иллюстрация почему AI не сможет, в ближайшие несколько лет, изменить энтерпрайз. Выгрузки данных в csv и json, обмен xml файлами по FTP, SOAP – это все реалии для корпоративного IT в финансовой сфере.