как вы проектируете, создаете и запускаете сквозной конвейер машинного обучения в Kubeflow?

В этом блоге-учебнике основное внимание будет уделено запуску конвейера машинного обучения в Kubeflow. Если вы хотите узнать, что такое Kubeflow и как его развернуть на своем локальном компьютере, вы можете прочитать мой предыдущий блог-учебник здесь или как развернуть его на AWS, вы можете прочитать другой мой блог-учебник здесь.

Отказ от ответственности. В этом блоге-учебнике предполагается, что вы уже настроили Kubeflow либо на AWS, либо на своем локальном компьютере.

В проектах машинного обучения сложно отслеживать, когда и где выполняются такие этапы, как подготовка данных, обучение модели и мониторинг, сквозные конвейеры служат пошаговой картой, описывающей, что и когда должно произойти. Конвейеры позволяют создавать проекты машинного обучения, которыми можно легко поделиться, воспроизвести и сшить.

Конвейеры Kubeflow — это набор сервисов и пользовательского интерфейса, позволяющий пользователям создавать конвейеры машинного обучения и управлять ими. Пользователи могут писать свой собственный код или создавать его из большого набора предопределенных компонентов и алгоритмов. Компонент задачи Kubeflow — это код Python, который упакован в образ Docker и выполняет один шаг в конвейере машинного обучения, он может принимать параметры и создавать выходные данные и запускать один или несколько модулей Kubernetes для каждого шага конвейера.(Каждый блок на рис. 1 ниже — это компонент задачи Kubeflow)

Вы можете думать о конвейере Kubeflow как о ориентированном ациклическом графе (DAG) с контейнеризованным процессом на каждом узле. Давайте приступим к строительству одного.

Создание конвейера Kubeflow.

Мы создадим сквозной конвейер машинного обучения для прогнозирования удовлетворенности клиентов телекоммуникационной компании на основе неконтролируемых данных. Конвейер включает в себя получение данных из облачной корзины Google, очистку данных, подготовку функций, определение оценок опыта и вовлеченности, определение оценки удовлетворенности и сохранение моделей в облачной корзине Google для обслуживания моделей. Поскольку объяснение того, что делает каждый компонент, сделает этот блог-руководство слишком длинным, мы рассмотрим некоторые компоненты задач, а остальные будут объяснены в моем репозитории GitHub, ссылка на который находится в конце статьи.
Мы также обсудим два способы создания конвейеров Kubeflow: использование облегченных компонентов и использование компонентов многократного использования. В этом руководстве по блогу мы рассмотрим конвейер Kubeflow, построенный с использованием облегченных компонентов, а повторно используемые компоненты мы рассмотрим в следующем руководстве по блогу. Для начала нам нужно установить Kubeflow SDK, выполнив следующую команду:

$ pip install kfp

Создание легковесных компонентов.

Чтобы создать легкий компонент, мы определяем автономную функцию Python, а затем вызываем kfp.components.func_to_container_op(stand_alone_function), чтобы преобразовать функцию в компонент задачи, который можно использовать в конвейере. Например, для компонента задачи загрузки данных у нас есть функция ниже:

Чтобы преобразовать вышеуказанную функцию в компонент задачи, мы просто пишем:

Обратите внимание на ключевое слово packages_to_install, оно устанавливает все пакеты, необходимые для запуска автономной функции.

К функции компонента задачи предъявляется несколько требований:

  • Функция должна быть автономной.
     – Она не должна использовать какой-либо код, объявленный вне определения функции.
     – Любые операции импорта должны быть добавлены в функцию компонента основной задачи.
     – Любые вспомогательные функции. также должен быть определен внутри основной функции компонента задачи.
  • Функция может импортировать только те пакеты, которые доступны в базовом образе, поэтому вам необходимо указать, какие пакеты устанавливать, используя ключевое слово packages_to_install. (как показано в приведенном выше примере)
  • Если функция работает с числами, параметры должны иметь подсказки типа. Поддерживаемые типы: int, float, bool. Все остальные аргументы передаются в виде строк.
  • Чтобы создать компонент задачи с несколькими выходными значениями, используйте подсказку типа Python typing.NamedTuple.

Вот функция подготовки признаков, которая выполняет следующее: суммирует столбцы в нашем наборе данных, создает новые столбцы, создает два новых фрейма данных и возвращает пути к фреймам данных. Благодаря своей высокой функциональности он отвечает большинству вышеперечисленных требований, как показано ниже:

Чтобы выполнить первое требование, мы импортировали pandas внутри функции компонента задачи и определили вспомогательную функцию sum_agg внутри основной функции компонента задачи. Поскольку каждая функция компонента задачи упакована внутри контейнера во время работы конвейера, любой импорт или любая функция, определенная вне функции компонента задачи, не будет доступна внутри контейнера, выполняющего функцию компонента задачи.

Чтобы получить несколько выходных данных из функции компонента задачи, при определении функции мы используем метод NamedTuple из модуля typing и указываем тип каждого выходного результата, который мы получаем, в наша функция подготовки функций выходными данными были пути к кадрам данных опыта и взаимодействия, и оба они были строкового типа, поэтому у нас есть подсказка типа выходных данных:

NamedTuple("feature_paths", [("exp_path", str), ("eng_path", str)])

Внутри функции компонента задачи мы преобразуем переменные пути с помощью метода namedtuple из модуля collections (Обратите внимание, что импорт происходит внутри функции компонента задачи):

from collections import namedtuple                                 feature_paths = namedtuple("feature_paths", ["exp_path", "eng_path"])

У нас также есть подсказка типа входного параметра в определении feature_prep (input_path: str).

Определение трубопровода.

Обертывание нашей конвейерной функции декоратором @dsl.pipeline преобразует функцию в компонент конвейера, описывающий, как компоненты задач взаимодействуют друг с другом. Как показано ниже;

from kfp import dsl                                                           @dsl.pipeline(name="telco_pipeline", description="lightweight component Telco pipeline for the medium article")
def telco_pipeline():

Функция конвейера для нашего проекта показана ниже (вы можете найти функции компонента задачи clean_data, find_eng_and_exp_score и find_satisfaction, определенные в связанном репозитории GitHub):

Во-первых, мы используем класс VolumeOp из модуля kfp.dsl, этот компонент задачи транслируется в шаблон ресурса, который создает постоянное требование тома (PVC). Это создает постоянный том, который мы можем присоединить к другим компонентам задачи и можем читать и записывать в него данные в зависимости от указанного режима. Здесь функция download_data загрузила наши данные, а функция подготовки функций сохранила наши данные. Ключевое слово resource_name определяет имя создаваемого PVC, ключевое слово size определяет размер создаваемого PVC, а ключевое словоmodes определяет режимы доступа к PVC. Как показано ниже;

data_op = dsl.VolumeOp(name="create-pvc",                                            resource_name="data-volume",                               size="2Gi",                                                        modes=dsl.VOLUME_MODE_RWO)

Следующим шагом является преобразование всех функций в компоненты задачи с помощью функции kfp.components.func_to_container_op() (как описано ранее во фрагменте кода 2). Затем мы присоединяем созданный постоянный том к компоненту задачи, используя функцию add_pvolumes(), которая принимает словарь, содержащий ключ, парное значение точки монтирования тома и тома из компонента задачи тома соответственно. . Как показано ниже;

step1 = download_data_op().add_pvolumes({"/data": data_op.volume})

Затем мы определяем порядок, в котором должны выполняться компоненты задачи. Мы также можем передать выходные данные из более ранних компонентов задачи в компоненты текущей задачи, вызвав earlier_task_component.output . В случае, когда у нас есть несколько выходных данных, мы указываем имя нужного выходного файла, т.е. earlier_task_component.outputs["name_of_output"] . Как показано ниже;

step2 = clean_data_op(step1.output).add_pvolumes({"/data": data_op.volume}) 
...
step4 = find_eng_and_exp_score_op(step3.outputs["exp_path"], step3.outputs["eng_path"]).add_pvolumes({"/data": data_op.volume})

Запуск трубопровода.

Существует два способа запуска конвейера Kubeflow. Мы можем запустить его непосредственно из нашего кода, создав соединение с нашими кластерами Kubeflow с помощью SDK (kfp), который мы установили на самом первом шаге. Чтобы запустить наш конвейер, мы используем код ниже.

Мы используем kfp.Client() для подключения к нашим кластерам Kubeflow и указываем конечную точку конвейера для подключения к конвейерам Kubeflow. Вы можете найти инструкции о том, как получить authservice_session_cookie здесь.

ПРИМЕЧАНИЕ. Если вы не можете найти файл cookie сеанса в своем браузере. вы можете следовать моему руководству о том, как получить authservice_session_cookie здесь.

Чтобы мы могли запустить конвейер, мы должны создать эксперимент. Думайте об эксперименте как о названии проекта, для которого требуется сквозной конвейер машинного обучения. Поскольку конвейер работает внутри эксперимента, мы можем иметь несколько запусков одного и того же конвейера или разных конвейеров в одном эксперименте.

kfp.complier.Compiler().compile(telco, "telco_pipeline.zip")преобразует конвейер в ZIP-файл, который хранится в том же каталоге, что и наш скрипт. Затем мы передаем этот файл в метод client.run_pipeline() вместе с идентификатором эксперимента, созданного ранее. Это запустит наш конвейер для нас.

если мы проверим нашу панель инструментов Kubeflow, мы увидим, что конвейер работает;

Мы также можем запустить конвейер, создав эксперимент вручную на панели инструментов Kubeflow, загрузив конвейер и создав запуск для конвейера. Я не буду вдаваться в подробности, так как Kubeflow настроил процесс максимально интуитивно. После того как вы загрузите ZIP-файл, созданный с помощью метода kfp.complier.Compiler().compile(telco, "telco_pipeline.zip"), процесс просто пойдет оттуда, и все, что вам нужно сделать, это нажать кнопки и добавить описательный текст.

При успешном выполнении у нас должна получиться диаграмма DAG, похожая на ту, что была на рисунке 1.

Заключение

В этом блоге-учебнике (третьем в серии Kubeflow) мы узнали об одном способе создания и запуска сквозного конвейера машинного обучения в Kubeflow. Код, написанный для всего проекта, вы можете найти здесь. Этот метод упрощает создание сквозного конвейера, чем повторно используемые компоненты, поскольку вам не нужно самостоятельно настраивать контейнеры докеров и писать соответствующие файлы yaml со спецификациями компонентов. Однако метод повторно используемых компонентов лучше подходит для повторного использования, как мы увидим в следующем (и последнем) руководстве по блогу в серии Kubeflow. Увидимся на следующем!