Путешествие по созданию конвейера обучения с использованием TFRecords и TFRanking

В нашей предыдущей статье мы рассказали об алгоритме, который использует Veepee для ранжирования продаж, отображаемых на главной странице. Теперь мы представим нашу текущую реализацию кода и первую попытку улучшить его.

Предыдущий рабочий процесс

Наш текущий конвейер состоит из трех основных этапов:

  • Генерация функций в BigQuery с использованием DBT и Python.
  • Извлечение этих функций в паркетные файлы с помощью скриптов Bash и Python.
  • Учебная часть с использованием Tensorflow

Эти шаги реализуются с использованием комбинации инструментов на Google Cloud Platform (в первую очередь BigQuery и Cloud Storage) для создания наборов данных и локального сервера для преобразования и обработки данных.

Обслуживание прогнозов в реальном времени обрабатывается отдельным API, созданным с использованием Tensorflow Serving и Scala.

Функциональная инженерия

На этом этапе есть две основные цели:

  • Объедините все данные Veepee для создания пользователей и функций продаж
    (~ 75 млн строк, 150Go для набора поездов)

Мы интенсивно используем BigQuery в основном из-за его скорости. Недостатком является ремонтопригодность сложных запросов и оркестровка DAG. Мы пытались использовать Beam with Dataflow с помощью Python, но производительность была очень низкой.

  • Создайте таблицы train / eval / test

Как объяснялось в предыдущей статье, мы выбираем продажи в диапазоне дат, в котором участник совершил покупку, и для каждой из этих продаж мы выбираем связанный отрицательный пример: случайную продажу, в которой участник не совершал покупки, но это было отображается на главной странице участника одновременно с продажей, в которой участник совершил покупку.

Что касается обучения и оценки, мы берем только одну отрицательную продажу, поскольку мы используем попарный подход. Однако для набора тестов, поскольку мы хотим вычислить показатели производительности, такие как средний рейтинг¹, нам нужны все отрицательные наблюдения: чтобы знать, где вы бы оценили положительный пример, вам нужно дать оценку всем продажам и ранжировать их, вот почему вам нужны положительные и отрицательные продажи.

Извлечение данных

Чтобы передать данные в наш алгоритм и получить хорошую производительность ввода-вывода, нам нужны паркетные файлы.

  • Сначала мы передаем данные в GCS в формате Avro с помощью bqutils.
bq extract \
    --destination_format AVRO \
    ${PROJECT_NAME}:${DATA_SET}.${TRAIN_TABLE} \
    ${GCS_TRAIN_EXPORT_PATH}
  • Затем мы передаем эти файлы на локальный сервер.
gsutil -m cp \
       -r ${GCS_TRAIN_EXPORT_PATH}/*.avro \
       $LOCAL_TRAIN_TABLE_PATH
  • Наконец, по соображениям производительности (паркетные файлы лучше оптимизированы для хранения, чем Avro), мы конвертируем файл Avro в паркетные файлы с помощью скрипта python.

Bigquery скоро будет поддерживать экспорт паркета: https://cloud.google.com/bigquery/docs / exporting-data # parquet_export_details

Модельное обучение

Когда у нас есть данные в паркете, мы хотим иметь возможность передавать в наш алгоритм пары положительных и отрицательных наблюдений. Однако наблюдения не хранятся вместе в BigQuery. Таким образом, мы обрабатываем все данные в памяти, чтобы создать эти пары с помощью идентификатора и передать их в сиамскую нейронную сеть.

Мы также могли бы использовать точечный подход, давая каждое наблюдение одно за другим, удаляя этап спаривания (что отнимает много времени). Оффлайн-производительность была аналогичной, но парный подход был более уместным в нашем случае: при небольшом и конечном количестве продаж лучше ранжировать на основе расстояния между продажами, а не на основе самой вероятности заказа. Более того, мы можем составить несколько пар, используя одно и то же положительное наблюдение, объединяя его с разными отрицательными наблюдениями, чтобы повысить производительность модели и ускорить обучение.

Прогнозы обслуживания:

Чтобы предоставить нашим пользователям персонализированный рейтинг продаж, мы разработали gRPC API под названием Agora. Он реализует службу под названием AgoraRecSys, которая принимает Рекомендацию в качестве входных данных и выводит Рекомендацию Ответ, как показано в следующем фрагменте кода.

Вызов модели

Для выполнения прогноза мы развертываем обученную модель (экспортированную в формат SavedModel) с TensorFlow Serving. Мы создали наш собственный образ докера для обслуживания TF, в котором экспортированная модель загружается из облачного хранилища и предоставляется через службу gRPC, которая принимает PredictRequest в качестве входных данных и возвращает PredictResponse с соответствующие баллы (TensorFlow предсказывает прото)

Получив Рекомендацию, Agora запросит обслуживающие базы данных для создания нового PredictRequest для данного идентификатора участника и текущих открытых продаж и отправит его в службу TF для получения прогноза. .

Следовательно, функции закодированы на карте, в которой ключи - это имена, а значения -

TensorProto формы (n, 1) (где n - количество текущих операций):

Служба TensorFlow ответит PredictResponse, подобным этому:

После этого Agora API сможет создать RecommandationResponse, упорядочивая оценки и возвращая соответствующие идентификаторы операций, этот ответ затем может быть использован на передней панели для отображения персонализированного рейтинга.

Проблемы

Чтобы обучить эту модель, мы создаем набор данных функций в BigQuery (1 строка = один пример (положительный или отрицательный)), который будет передан, преобразован и, наконец, загружен в память.

Когда у нас есть все данные в памяти, мы можем создавать наблюдения в виде пар примеров (положительный и отрицательный примеры), используя идентификатор.

Этот метод требует передачи большого количества данных, большого количества памяти и высокой загрузки ЦП. В настоящее время у нас есть выделенный сервер, что не позволяет нам обучать несколько моделей одновременно. Мы могли бы использовать экземпляры GCP, но это станет очень дорого.

Решение

Один из способов решения проблемы - создать пары непосредственно в BigQuery, затем закодировать их в подходящем формате и «на лету» загрузить в нейронную сеть, чтобы потребление памяти оставалось низким.

  • Создание функций: остается прежним. Данные вычисляются с помощью BigQuery и сохраняются как функции также в BigQuery. Однако на этом этапе пары связываются.
  • Кодирование данных: мы используем формат TFRecord для хранения масштабированных и закодированных данных в определенном формате.
  • Обучение. Мы используем недавно выпущенную систему ранжирования Tensorflow: TFRanking.

Новый конвейер: вложение, кодирование, TFRanking

Генерация данных: от плоских данных до вложенных данных

Предыдущая реализация

Каждая строка представляет собой пример с одним значением для каждой функции и меткой:

  • 1: если в результате продажи был получен заказ (положительное наблюдение)
  • 0: еще (отрицательное наблюдение).

Отрицательные наблюдения - это продажи, доступные на сайте в то же время, когда пользователь сделал заказ.

Чтобы связать положительные и отрицательные наблюдения, мы используем столбец id, в то время как значение поля 'label' определяет, является ли наблюдение положительным (1: участник, купленный в этой продаже) или отрицательным (0 : продажа была видна на домашней странице участника, но он не покупал на ней). Однако, поскольку положительные и отрицательные наблюдения для одного и того же члена находились в разных строках в экспортированных данных BigQuery, нам нужно было реализовать этап объединения в обучающий код, извлекая все данные из памяти.

Сложите положительный пример с его отрицательными

Чтобы избежать загрузки всех этих данных в память, мы воспользовались встроенными в BigQuery возможностями обработки вложенных структур данных. Таким образом, мы можем объединить все положительные и отрицательные наблюдения в их общий контекст:

Мы обернем все наши функции в STRUCT и сгруппируем их в их общем контексте, используя ARRAY_AGG следующим образом:

Мы также использовали функцию STRUCT для пользовательских функций для ясности.

Файл JSON, возвращенный для простого `SELECT * LIMIT 1`, выглядит следующим образом:

Позже мы увидим, что эта структура в точности совпадает с входными данными, необходимыми для TFRanking.

Кодирование данных: от примера к ExampleListWithContext

Поскольку загружать данные напрямую из BigQuery будет неэффективно, мы решили закодировать наши данные в файлах TFRecords, используя фреймворк TFX / TFTransform . Формат TFrecord - это протокольный буфер, который дает нам:

  • Возможность не загружать все данные в память. Используя класс набора данных , мы можем получать данные по частям. Tensorflow загружает больше данных, используя ЦП, в то время как графический процессор работает над обучающей частью.
  • Естественная высокоскоростная обработка, поскольку не нужно сначала загружать ваши данные в массив numpy, а затем загружать их обратно в сеанс keras / tenorflow.

В TFX формат Пример - это формат, используемый для кодирования, определяющего наблюдение. Его структура действительно близка к Python dict.

Это хорошо обрабатывается фреймворком Beam с использованием ExampleProtoCoder, и если бы мы использовали только точечный подход, мы могли бы использовать этот формат, так как нам нужно передавать только один пример алгоритму за раз.

Вот пример структуры:

Однако пакет TFRanking, более подробно описанный ниже, работает с форматом данных, который называется ExampleListWithContext (ELWC).

С новым форматом ELWC мы можем встроить особенности продаж (положительные и все отрицательные) в один объект. У этого объекта есть два ключа:

  • Контекст: особенности пользователя: возраст, предыдущий порядок, предпочтения и т. д. Закодировано как один «Пример»
  • Примеры: набор функций. Каждый набор функций (закодированный как пример) содержит характеристики продажи (бренд, популярность и т. Д.) И ее ярлык.

Вот те же данные, но в кодировке ExampleListWithContext:

и вот простой код, который генерирует ELWC:

Конструкция балки

У Beam уже есть протокодер для кодирования данных в TFRecords со структурой Example.

Но в настоящее время для структуры ELWC ничего не существует: https://github.com/tensorflow/transform / issues / 163 # event-3068836464

Чтобы обрабатывать значения None в данных, нам пришлось изменить нашу предыдущую реализацию.

Действительно, метод кодирования ExampleProtoCoder не обрабатывает значения None.

Вот реализация кодера:

Значения по умолчанию при обслуживании

Одним из решений для значения None могла быть замена их значением по умолчанию перед кодированием, либо в BigQuery, либо в коде с методом предварительной обработки Beam.

Однако во время обслуживания данные все еще находятся в необработанном формате. Мы хотим применить функцию преобразования (применяя масштабирование и встраивание) и заменить отсутствующие значения значениями по умолчанию. Для этого нам нужно определить функцию преобразования, которая заменяет значения None значениями по умолчанию.

Таким образом, нам нужно определить функцию преобразования, чтобы они заменяли значения None значением по умолчанию. Цель состоит в том, чтобы минимизировать вычислительные затраты на обслуживание:

Загрузка данных: локальные файлы против BigQuery

Мы сравнили загрузку данных из локального файла с запросом данных из BigQuery в конвейере Beam. Мы заметили, что время загрузки было очень похожим, без учета дополнительного времени на передачу и преобразование локального файла.

Таким образом, мы решили отказаться от локального файла и запрашивать наши данные непосредственно из BigQuery, сделав конвейер Beam как можно более простым.

Недостаток: размер данных.

Так как в BigQuery 40 миллионов строк представляют собой 132,83 ГБ данных, в формате TFRrecord общий размер файлов (1370 файлов) составляет 1,1 ТБ. Это может быть серьезным недостатком, если нужно обрабатывать данные на сервере за пределами GCP. Действительно, затраты на передачу огромны, и при обучении в несколько эпох (например, 50) загрузка данных локально увеличивает конечную стоимость в столько же раз.

Однако, если вы используете экземпляр GCP, передача данных между Google Cloud Storage и экземпляром осуществляется бесплатно.

Рейтинг: от оценщика до Кераса

TensorFlow Ranking - это библиотека методов Learning-to-Rank (LTR) на платформе TensorFlow. Он стартовал в декабре 2018 года и сейчас находится на версии 0.3.1. Он направлен на облегчение разработки крупномасштабных задач ранжирования с помощью структуры глубокого обучения. Он обладает широкими возможностями настройки и предоставляет простые в использовании API-интерфейсы для поддержки различных механизмов оценки, функций потерь и показателей оценки в настройке обучения для ранжирования (источник).

Функции TFRanking

Мы выбрали фреймворк TFRanking, потому что он:

  • Реализует точечно / попарно / по списку
  • Реализует уровень внимания
  • Увеличивает размер группы, используя попарный подход, создавая n² пар.
  • Реализует обучение с использованием Keras API.

Новый пайплайн будет таким:

  • Вложите данные с помощью BigQuery
  • Закодируйте данные в TFRecords в формате ELWC
  • Используйте TFRanking для обучения модели

Вот новый потенциальный трубопровод:

Сервировочная часть

Решающим шагом в создании модели является определение обслуживающей функции. Он будет определять способ подачи входных данных в модель, способ преобразования (масштабирования) этих данных и определение выходных данных (прогнозов).

В предыдущей реализации данные были плоскими и необработанными: когда пользователь подключается к Veepee’s, мы собираем его характеристики и все характеристики текущих продаж. Затем мы просим модель предсказать оценку для каждой продажи и, наконец, оценить их для пользователя.

Предыдущая реализация предоставляет нам очень эффективный метод автономной оценки. Действительно, модель можно загрузить в BigQuery, и мы можем делать прогнозы на основе тестовых данных с помощью функции ML.PREDICT.

Поскольку TFRanking использует формат ELWC, для функции обслуживания по умолчанию требуются данные ELWC в качестве входных данных или, по крайней мере, объект Example. В приведенном примере также пропущена часть нормализации функций: в нашем случае мы масштабируем числовые функции для обучения, и мы не можем масштабировать их на лету, прежде чем передать их в модель. Функция обслуживания должна обработать этот шаг.

  • Некоторые функции необходимо транслировать, поскольку их свежесть имеет решающее значение.
  • Масштабирование должно происходить на лету
  • Замена значения по умолчанию должна производиться на лету
  • Каждая продажа отправляется независимо с единым списком функций.
  • Созданная модель должна быть запрошена BigQuery с ML.Predict

Без конкретной функции обслуживания входом должен быть ELWC, а модель, обновленная на BigQuery, показывает:

Однако совместимость TFRanking и TFTransform не позволила нам правильно управлять категориальными функциями и экспортировать модель в BigQuery в том же формате, что и раньше. Учитывая эти недостатки, мы решили поискать другое решение.

Заключение

Наш текущий подход работает хорошо: мы проверили конвейер для обучения, обслуживания, а также для очень быстрого получения офлайн-результатов. Однако для запуска этого полного конвейера необходимо выполнить несколько шагов по оркестровке на разных платформах, что не позволяет нам полностью автоматизировать работу.

Мы попытались исправить это, кодируя данные в TFRecords с помощью библиотеки TFTRansform и обучая нашу модель с помощью библиотеки TFRanking.

Однако есть еще некоторые проблемы, которые мешают нашему желанию полностью перейти на TFRanking:

  • Совместимость с TFX кажется непростой (Несовместимые формы, Кодировщик ELWC)
  • Созданная модель не может быть загружена в BigQuery или не имеет правильной выходной сигнатуры (проблема)
  • Проект не выглядит очень активным, и проблемы не решаются часто.
  • В кодовой базе используется API-интерфейс Tensorflow Estimator (вместо Keras), что затрудняет модификацию.

По всем этим причинам в настоящее время мы работаем над самодельной реализацией с использованием TF2.3 и Keras.

[1]: Один из наших текущих показателей - это средний рейтинг продаж, по которым был произведен заказ. Наша автономная оценка предсказывает оценку каждой продажи, которую пользователи видели перед своим заказом, и вычисляет рейтинг, который она бы поставила положительным продажам. Усреднение по каждому пользователю дает нам средний показатель рейтинга.

Подтверждение

Спасибо Вивиан Оливерес и Бобу Де Шуттеру за все их исправления и предложения.

Эта статья написана Тиллем, специалистом по анализу данных, и Пьером Хайдара, инженером по данным сообщества vpTech.