Привет, Хабр!

Combine — это фреймворк, представленный Apple в 2019 году, предназначенный для работы с асинхронными потоками данных на платформе Swift. Основная задача Combine заключается в том, чтобы упростить и унифицировать обработку асинхронных событий, таких как сетевые запросы, таймеры, уведомления и пользовательские действия.

Фреймворк является частью экосистемы Swift и доступен начиная с iOS 13, macOS 10.15, watchOS 6.0 и tvOS 13. Если вы работаете с более ранними версиями, обновление Xcode до версии 11 или выше автоматом предоставит вам доступ к Combine.

В этой статье мы рассмотрим основы этого замечательного фреймворка.

После установки Xcode вы сможете использовать Combine в своих проектах, импортировав его следующим образом:

import Combine

Создание и комбинирование потоков данных с помощью Combine

В Combine всё начинается с Publisher — сущности, которая эмитирует значения с течением времени. Это может быть что угодно: массив чисел, результат сетевого запроса, или события пользовательского интерфейса.

Простейший пример Publisher — это массив, преобразованный в Publisher с помощью .publisher:

import Combine

let numbers = [1, 2, 3, 4, 5].publisher

numbers.sink { value in
    print("Received value: \(value)")
}

Массив numbers превращается в Publisher, который выдает каждое число последовательно. Метод sink — это Subscriber, который "подписывается" на Publisher и выводит каждый полученный элемент.

Если хочется создать собственный Publisher, можно использовать PassthroughSubject или CurrentValueSubject:

import Combine

let subject = PassthroughSubject<String, Never>()

subject.sink { value in
    print("Received value: \(value)")
}

subject.send("Hello")
subject.send("Combine")

Здесь PassthroughSubject позволяет вручную выдавать значения, используя метод send, можно контролировать, когда и какие данные будут отправлены подписчикам.

Когда дело доходит до объединения данных из разных источников, есть несколько операторов: merge, combineLatest и zip.

  • merge объединяет несколько Publisher-ов в один, эмитируя значения по мере их поступления:

import Combine

let publisher1 = PassthroughSubject<Int, Never>()
let publisher2 = PassthroughSubject<Int, Never>()

let merged = publisher1.merge(with: publisher2)

merged.sink { value in
    print("Merged value: \(value)")
}

publisher1.send(1)
publisher2.send(2)
publisher1.send(3)

Результат работы merge будет последовательность: 1, 2, 3, т.е. значения от обоих Publisher-ов объединяются в один поток.

  • combineLatest выдает пары значений от обоих Publisher-ов, когда оба источника данных предоставили хотя бы одно значение:

import Combine

let publisherA = PassthroughSubject<String, Never>()
let publisherB = PassthroughSubject<String, Never>()

let combined = publisherA.combineLatest(publisherB)

combined.sink { value in
    print("Combined value: \(value)")
}

publisherA.send("Hello")
publisherB.send("World")
// Вывод: "Combined value: ("Hello", "World")"
publisherA.send("Hi")
// Вывод: "Combined value: ("Hi", "World")"

combineLatest ждет данные от обоих Publisher-ов и затем выдает их в виде пары, каждый раз, когда поступает новое значение от одного из них.

  • zip работает подобно combineLatest, но выдает пары только тогда, когда обе стороны готовы передать по одному значению:

import Combine

let pub1 = PassthroughSubject<Int, Never>()
let pub2 = PassthroughSubject<String, Never>()

let zipped = pub1.zip(pub2)

zipped.sink { value in
    print("Zipped value: \(value)")
}

pub1.send(1)
pub2.send("One")
// Вывод: "Zipped value: (1, "One")"
pub1.send(2)
pub2.send("Two")
// Вывод: "Zipped value: (2, "Two")"

Этот оператор хорош, когда необходимо синхронизировать данные из разных источников.

Допустим, есть приложение, которое принимает пользовательский ввод и результат предсказания модели, и вы хочется комбинировать эти данные для отображения их в UI:

import Combine

// Создаем два Publisher-а: один для ввода пользователя, другой для предсказаний
let userInput = PassthroughSubject<String, Never>()
let prediction = PassthroughSubject<String, Never>()

// Комбинируем данные, используя combineLatest
let combinedData = userInput.combineLatest(prediction)

combinedData.sink { user, predicted in
    print("User input: \(user), Model prediction: \(predicted)")
}

// Эмитируем значения
userInput.send("User says: Hello")
prediction.send("Model predicts: Hi")
// Вывод: "User input: User says: Hello, Model prediction: Hi"

combineLatest позволяет связать ввод пользователя с результатами работы ML-модели.

Трансформация данных в потоках

Оператор map является основным инструментом для трансформации данных в Combine. Он позволяет взять данные, выдаваемые Publisher-ом, и преобразовать их в новую форму, изменяя каждый элемент потока.

Представим, что есть поток чисел, и хочется умножить каждое из них на 2. Это легко реализуется с помощью map:

import Combine

let numbers = [1, 2, 3, 4, 5].publisher

let multipliedNumbers = numbers.map { $0 * 2 }

multipliedNumbers.sink { value in
    print("Transformed value: \(value)")
}

Результатом выполнения этого кода будет вывод:

Transformed value: 2
Transformed value: 4
Transformed value: 6
Transformed value: 8
Transformed value: 10

Оператор map подходит для предобработки данных перед их передачей в ML модель. Например, если нужно нормализовать входные данные перед подачей их в модель:

let rawInputs = [0.5, 0.75, 1.0, 1.25].publisher

let normalizedInputs = rawInputs.map { $0 / 2.0 }

normalizedInputs.sink { value in
    print("Normalized value: \(value)")
}

Оператор filter позволяет отфильтровывать значения, пропуская только те, которые удовлетворяют определенным условиям.

Рассмотрим пример, где нужно оставить в потоке только четные числа:

let numbers = [1, 2, 3, 4, 5, 6].publisher

let evenNumbers = numbers.filter { $0 % 2 == 0 }

evenNumbers.sink { value in
    print("Filtered value: \(value)")
}

Вывод:

Filtered value: 2
Filtered value: 4
Filtered value: 6

В контексте ML, filter может быть полезен для удаления выбросов или данных, не соответствующих определенным критериям. Например, если нужно исключить из входных данных все значения, превышающие допустимый предел:

let rawInputs = [0.5, 2.5, 1.0, 3.5].publisher

let validInputs = rawInputs.filter { $0 <= 2.0 }

validInputs.sink { value in
    print("Valid input: \(value)")
}

flatMap — это оператор, который раскрывает новый Publisher на основе каждого значения, выдаваемое исходным Publisher-ом.

Рассмотрим пример с сетевым запросом, который возвращает данные на основе пользовательского ввода:

import Foundation

let userInput = PassthroughSubject<String, Never>()
let searchResults = userInput.flatMap { query in
    URLSession.shared.dataTaskPublisher(for: URL(string: "https://api.example.com/search?q=\(query)")!)
        .map { $0.data }
        .decode(type: [String].self, decoder: JSONDecoder())
        .catch { _ in Just([]) }  // Обрабатываем возможные ошибки
}

searchResults.sink { results in
    print("Search results: \(results)")
}

userInput.send("apple")

flatMap используется для запуска сетевого запроса на основе пользовательского ввода. Каждый раз, когда пользователь вводит новый запрос, flatMap раскрывает новый Publisher, который затем обрабатывает результаты и возвращает их.

Примеры использования

Предположим, есть поток сырых данных, которые нужно нормализовать и отфильтровать перед подачей в модель:

let rawInputs = [1.5, 0.9, 2.5, 3.7, 1.8].publisher

let processedInputs = rawInputs
    .filter { $0 < 3.0 } // Оставляем только допустимые значения
    .map { $0 / 3.0 } // Нормализуем данные

processedInputs.sink { value in
    print("Processed input: \(value)")
}

После того как модель выдала свои предсказания, возможно, потребуется постобработка данных, чтобы они стали пригодными для отображения в UI или дальнейшей аналитики:

let modelPredictions = [0.2, 0.8, 1.0, 0.4].publisher

let filteredPredictions = modelPredictions
    .filter { $0 > 0.5 } // Оставляем только уверенные предсказания
    .map { $0 * 100.0 } // Преобразуем в проценты

filteredPredictions.sink { value in
    print("Filtered prediction: \(value)%")
}

Управление ошибками и отложенная обработка данных

В Combine ошибки обрабатываются с помощью операторов catch и retry.

Оператор catch позволяет перехватывать ошибки, возникающие в потоке данных, и предоставлять альтернативный Publisher для продолжения работы.

Пример:

import Combine

struct MyError: Error {}

let faultyPublisher = Fail<Int, MyError>(error: MyError())

let safePublisher = faultyPublisher
    .catch { _ in Just(0) } // В случае ошибки возвращаем значение 0
    .sink { value in
        print("Received value: \(value)")
    }

// Вывод: "Received value: 0"

ИспользуемFail для эмуляции ошибки, а затем обработали ее с помощью catch, который возвращает безопасное значение.

Оператор retry позволяет автоматически повторять выполнение потока данных, если в нем произошла ошибка.

Пример:

import Combine

var attemptCount = 0

let retryingPublisher = Deferred {
    attemptCount += 1
    return attemptCount < 3 ? Fail(error: URLError(.badServerResponse)) : Just("Success")
}
.retry(3) // Повторяем до трех раз
.sink(
    receiveCompletion: { completion in
        switch completion {
        case .finished:
            print("Completed successfully")
        case .failure(let error):
            print("Failed with error: \(error)")
        }
    },
    receiveValue: { value in
        print("Received value: \(value)")
    }
)

// Вывод: "Received value: Success"

Здесь retry пытается выполнить поток данных до трех раз. На третьей попытке поток завершается успешно.

Работа с асинхронными данными часто требует управления частотой обработки событий, особенно если поток данных поступает быстро и может перегрузить систему. Combine предоставляет два оператора для этой задачи: debounce и throttle.

debounce — это оператор, который откладывает обработку событий до тех пор, пока не пройдет определенное время без поступления новых данных.

Пример:

import Combine
import Foundation

let searchTextPublisher = PassthroughSubject<String, Never>()

let debouncedSearch = searchTextPublisher
    .debounce(for: .milliseconds(500), scheduler: RunLoop.main)
    .sink { value in
        print("Search query: \(value)")
    }

searchTextPublisher.send("S")
searchTextPublisher.send("Sw")
searchTextPublisher.send("Swi")
searchTextPublisher.send("Swif")
searchTextPublisher.send("Swift")
// Через 500 мс после последнего ввода: "Search query: Swift"

Здесь debounce откладывает отправку данных до тех пор, пока пользователь не закончит ввод текста, избегая излишних запросов.

throttle позволяет пропускать события только через определенные интервалы времени, ограничивая частоту их обработки.

Пример:

import Combine
import Foundation

let eventPublisher = PassthroughSubject<Void, Never>()

let throttledEvents = eventPublisher
    .throttle(for: .seconds(1), scheduler: RunLoop.main, latest: true)
    .sink {
        print("Event received")
    }

for _ in 1...5 {
    eventPublisher.send()
    Thread.sleep(forTimeInterval: 0.3) // Эмуляция частого срабатывания события
}

// Вывод: "Event received" будет напечатано только один раз в секунду

throttle в этом примере пропускает только одно событие в секунду, игнорируя остальные, что позволяет эффективно управлять нагрузкой.

Рассмотрим сценарий, где идет работа с потоком данных из нескольких источников, например, с сенсоров и сервера, и нужно управлять ошибками и задержками:

import Combine
import Foundation

enum SensorError: Error {
    case sensorFailure
}

let sensorDataPublisher = PassthroughSubject<Int, SensorError>()
let serverDataPublisher = PassthroughSubject<Int, URLError>()

let combinedPublisher = Publishers.CombineLatest(sensorDataPublisher, serverDataPublisher)
    .retry(2) // Пытаемся заново в случае ошибки
    .debounce(for: .milliseconds(300), scheduler: RunLoop.main) // Избегаем частой обработки данных
    .catch { _ in Just((0, 0)) } // В случае ошибки возвращаем безопасное значение
    .sink { sensorData, serverData in
        print("Sensor: \(sensorData), Server: \(serverData)")
    }

sensorDataPublisher.send(100)
serverDataPublisher.send(200)

// Эмулируем ошибку сенсора
sensorDataPublisher.send(completion: .failure(.sensorFailure))

serverDataPublisher.send(300)

Используем CombineLatest для объединения данных от сенсоров и сервера, обрабатываем ошибки с помощью retry, избегаем частой обработки данных с помощью debounce, и, наконец, безопасно обрабатываем ошибки с помощью catch.

Подробнее с библиотекой Combine можно ознакомиться здесь.


Актуальна тема подготовки данных в Pandas? Приходите на открытый урок 19 августа, который пройдет в рамках специализации Machine Learning.

В результате вебинара вы узнаете о методах подготовки данных и научитесь чистить данные при помощи библиотеки Pandas. Записывайтесь на урок по ссылке.

Комментарии (0)