Spark broadcast что это
Packages
Core Spark functionality.
Core Spark functionality. org.apache.spark.SparkContext serves as the main entry point to Spark, while org.apache.spark.rdd.RDD is the data type representing a distributed collection, and provides most parallel operations.
In addition, org.apache.spark.rdd.PairRDDFunctions contains operations available only on RDDs of key-value pairs, such as groupByKey and join ; org.apache.spark.rdd.DoubleRDDFunctions contains operations available only on RDDs of Doubles; and org.apache.spark.rdd.SequenceFileRDDFunctions contains operations available on RDDs that can be saved as SequenceFiles. These operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit conversions.
Java programmers should reference the org.apache.spark.api.java package for Spark programming APIs in Java.
Classes and methods marked with Experimental are user-facing features which have not been officially adopted by the Spark project. These are subject to change or removal in minor releases.
Classes and methods marked with Developer API are intended for advanced users want to extend Spark through lower level interfaces. These are subject to changes or removal in minor releases.
Spark’s broadcast variables, used to broadcast immutable datasets to all nodes.
Spark’s broadcast variables, used to broadcast immutable datasets to all nodes.
abstract class Broadcast [ T ] extends Serializable with Logging
A broadcast variable. Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once. In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).
Type of the data contained in the broadcast variable.
Мифы о Spark, или Может ли пользоваться Spark обычный Java-разработчик
Продолжаем расшифровывать и местами облагораживать хардкорные доклады спикеров JPoint 2016. Сегодня доклад поменьше, всего час с копейками, соответственно, концентрация пользы и отжига на одну минуту зашкаливает.
Итак, Евгений EvgenyBorisov Борисов о Spark, мифах и немного о том, дествительно ли тексты Pink Floyd адекватнее, чем у Кэти Пэрри.
Это будет необычный доклад о Spark.
Обычно много рассказывают про Spark, какой он крутой, показывают код на Scala. Но у меня немного другая цель. Во-первых, я поговорю о том, что такое Spark и зачем он нужен. Но основная цель — показать, что вы, как Java-девелоперы, можете прекрасно им пользоваться. В этом докладе мы развеем несколько мифов о Spark.
Коротко о себе
Я был Java-программистом с 2001 года.
К 2003 году параллельно начал преподавать.
С 2008 начал заниматься консультациями.
С 2009 года занимался архитектурой разных проектов.
Стартап свой открыл в 2014.
С 2015 года я являюсь technical leader по Big data в компании Naya Technologies, которая внедряет big data везде, где только может. У нас огромное количество клиентов, которые хотят, чтобы мы им помогли. Нам катастрофически не хватает людей, которые разбираются в новых технологиях, поэтому мы постоянно ищем работников.
Мифы о Spark
Мифов о Spark ходит довольно много.
Во-первых, есть какие-то концептуальные мифы, о которых мы поговорим подробнее:
Посмотрим, какие из этих мифов получится опровергнуть.
Миф 1. Spark и Hadoop
По большому счету Hadoop является просто хранилищем информации. Это распределенная файловая система. Плюс к нему предлагается определенный набор инструментов и API, при помощи которого эту информацию можно процессить.
В контексте Spark правильнее сказать, что это не Spark нуждается в Hadoop, а наоборот, потому что информацию, которую можно хранить в Hadoop и процессить при помощи его инструментов (сталкиваясь с проблемой быстродействия), быстрее можно процессить при помощи Spark.
Вопрос в том, нуждается ли Spark в Hadoop, чтобы работать?
Вот определение Spark:
Разве здесь есть слово Hadoop? Тут есть модули Spark:
Давайте поговорим просто про Spark.
Эта идея зародилась в Университете Беркли примерно в 2009 году. Первый релиз вышел не так давно — в 2012. Сегодня мы находимся на версии 2.1.0 (она вышла в конце 2016 года). На момент озвучивания этого доклада актуальна была версия 1.6.1, но обещали скорый выход Spark 2.0, где почистили API и добавили много новых полезных вещей (нововведения Spark 2.0 здесь не учтены).
Написан сам Spark на Scala, что объясняет миф о том, что использовать Spark лучше при помощи Scala, потому что получается нативный API. Но помимо Scala API существует для:
Скажу буквально два слова про data locality. В чем идея обработки big data, которые находятся не на одной машине, а на большом их количестве?
Когда мы пишем какой-то код, работающий, например, с jdbc или ORM, фактически что происходит? Есть машина, которая запускает Java-процесс, и когда в этом процессе бежит код, обращающийся к базе данных, все данные вычитываются из БД и перегоняется туда, где работает этот Java-процесс. Когда мы говорим про big data, это сделать невозможно, потому что данных слишком много — это неэффективно и у нас образуется горлышко бутылки. Кроме того, data и так уже распределенная и изначально находится на большом количестве машин, поэтому правильнее не data тянуть к этому процессу, а код распределять на те машины, на которых мы хотим эту «дату» обрабатывать. Соответственно, это происходит параллельно на многих машинах, мы задействуем неограниченное количество ресурсов, и вот здесь нам нужен кластер-менеджер, который будет координировать эти процессы.
На этой картинке вы видите, как все это работает в мире Spark.
У нас есть Driver — наш main, который запускается на отдельной машине (не имеющей отношения к кластеру). Когда мы сабмитим наше Spark-приложение, мы обращаемся к Yarn, который является ресурс-менеджером. Мы ему говорим, сколько worker-ов задействовать под наш Java-процесс (например, 3). Он из кластерных машин выбирает одну машину, которая будет называться Application Master. Ее задача — получить код и найти в кластере три машины для его выполнения. Находятся три машины, поднимаются три отдельных Java-процесса (три executor-а), где запускается наш код. Потом это все возвращается Application Master, и в конечном итоге он возвращает это напрямую на Driver, если мы хотим результат операции над big data получить обратно туда, откуда код вышел.
Это напрямую не связано с тем, о чем я сегодня буду говорить. Просто в двух словах о том, как Spark работает с Cluster Manager (в данном примере с Yarn) и почему мы не ограничены в ресурсах (разве что в денежных — сколько мы можем позволить себе машин, памяти и т.д.). Это все немного похоже на классический MapReduce — старый API, который был в Hadoop (в принципе он есть и сейчас), с той только разницей, что когда этот API писался, машины были недостаточно сильными, промежуточные результаты данных можно было хранить только на диске, потому что в оперативной памяти не было достаточно места. Поэтому все это работало медленно. В качестве примера могу сказать, что мы недавно переписали код, который был написан на старом MapReduce и он бежал в районе 2,5 часов. Сейчас он работает 1,5 минуты на Spark, поскольку Spark хранит все в оперативной памяти — намного быстрее получается.
Очень важно понимать, когда вы пишите код, что одна его часть будет исполняться на кластере, а другая — на Driver-е. У людей, которые этого не понимают, очень часто случаются всякие OutOfMemory и т.д. (мы про это поговорим — я покажу примеры этих ошибок).
Итак, Spark… поехали
RDD (resilient distributed dataset) — это основной компонент, на котором работает весь Spark.
Давайте начнем с термина dataset — это просто хранилище информации (Collection). У него API очень похож на Stream. По сути, как и Stream, он не является хранилищем данных, а некой абстракцией на данными (в данном случае ещё и распределёнными) и позволяет запускать на эти данные всякие функции. В отличие от Stream, RDD изначально Distributed — находится не на одной машине RDD, а на том количестве машин, которое при запуске Spark мы разрешили использовать.
Resilient говорит том, что его не убьешь, потому что если какая-то машина в процессе обработки данных отключилась (что-то там случилось, например, вырубили свет), кластер-менеджер сможет поднять другую машину и передиплоить туда java-процесс, и RDD восстановится. Мы даже этого не почувствуем.
Откуда можно получить RDD?
Мы чуть позже обсудим, что такое sc (это такой стартовой объект Spark). Здесь мы создаем RDD:
Так создается RDD из памяти:
У вас есть метод parallelize, который принимает list и превращает его в RDD.
Теперь мы подходим к вопросу, что такое sc, который мы постоянно использовали для получения RDD. Если мы работаем со Scala, этот объект называется SparkContext. В мире Java API он называется JavaSparkContext. Это основная точка, с которой мы начинаем писать код, связанный со Spark, потому что оттуда мы получаем RDD.
Вот пример, как конфигурируется объект Spark-контекста на Java:
Создается сначала объект Spark-конфигурации, он настраивается (вы говорите, как называется запускаемое приложение), далее указываете, работаем мы локально или нет (звездочка говорит, сколько найдешь thread-ов, столько и можно использовать; можно указать 1, 2 и т.д.). И потом я создаю JavaSparkContext и передаю сюда конфигурацию.
Тут возникает первый вопрос: а как же все разделить? Если я SparkContext создаю таким образом и передаю ему сюда конфигурацию, это не будет работать на кластере. Мне надо разделить, чтобы на кластере здесь у меня вообще ничего не было написано (потому что в момент запуска Spark-процесса нужно сказать, сколько надо использовать машин, кто у нас мастер, кто у нас кластер-менеджер и так далее). Я не хочу, чтобы эта конфигурация здесь была; я хочу оставить только application name.
И тут на помощь приходит Spring: мы делаем два bean-а. Один у нас под профилем production (он вообще никакой информации не передает о том, кто у нас мастер, сколько машин и т.д.), другой под профилем local (и здесь я эту информацию передаю; можно сразу легко разделить). Для тестов будет один bean работать из SparkContext, а для продакшн — другой.
Вот список функций, которые есть у RDD.
Они очень похожи на функции Stream: тоже все Immutable, тоже возвращают RDD (в мире Stream это называлось intermediate operations, а тут — transformations). В подробности мы сейчас вдаваться не будем.
Также есть Actions (в мире Stream-ов это называлось terminal operations).
Как определить, что Action, а что Transformation? Как и в стримах, если RDD-метод возвращает RDD, это Transformation. Если нет — значит это Action.
Action существует двух видов:
Эта схема похожа на стримы, но здесь есть один маленький нюанс. У нас есть какая-то data, которая находится, допустим, в s3 storage. Я при помощи SparkContext создал свой первый RDD1. Потом я делаю всякие разные трансформации, каждая из которых возвращает мне RDD. В конечном итоге я выполняю Action и получаю какую-то пользу (сохранил, распечатал или переслал то, что у меня получилось). Этот кусок, естественно, выполняется на кластере (все RDD-методы запускаются на кластере). Маленький кусочек в конце будет запускаться на Driver в том случае, если итогом станет какой-то ответ. Все, что слева от Data (т.е. до того, как я начал пользоваться кодом Spark) — тоже будет запускаться на Driver, а не на кластере.
Все это Lazy — точно так же, как в стримах. Каждый метод RDD, который является трансформацией, ничего не делает, а ждет Action. Когда будет Action, вся цепочка запустится. И тут возникает классический вопрос: а что мы делаем вот в таком случае?
Представьте, что моя data — это все денежные транзакции за последние 5 лет в каком-то банке. И мне надо провести достаточно длинную обработку, а дальше она разделяется: для всех мужчин я хочу сделать один Action, а для всех женщин — другой. Допустим, у меня первая часть процесса займет 10 минут. Вторые части процесса потребуют по минуте. Казалось бы, у нас должно получиться в сумме 12 минут?
Нет, у нас получается 22 минуты, потому что Lazy — каждый раз, когда запускается Action, прогоняется вся цепочка от начала до конца. В нашем случае общий кусок запускается только 2 раза, но если бы у нас было 15 разветвлений?
Естественно, это очень сильно бьёт по перформансу. В мире Spark очень легко писать код, особенно людям, которые хорошо знакомы с функциональным программированием. Но ерунда получается. Если хотите писать эффективный код, надо знать многие особенности.
Давайте попробуем решить проблему. Чтобы мы сделали в стримах? Сделали бы какой-то collect, собрали это все в collection, а потом из нее вытаскивали бы стримы.
В GetTaxi пробовали, но получилось вот так:
Причем, они собирались докупить еще машин на кластер, чтобы их было 40 штук и у каждой по 20 джигабайт оперативной памяти.
Надо понимать: если мы говорим про big data, в тот момент, когда вы делаете collect, вся информация из всех RDD возвращается к вам на Driver. Поэтому джигабайты и машины им никак не помогают: когда они делают collect, вся информация сливается в одно место, откуда запустилось приложение. Естественно, получается out of memory.
Как решаем эту проблему (дважды цепочку прогонять не хочется, 15 — тем более, а collect делать нельзя)? Для этого в Spark есть метод persist:
Persist позволяет сохранить state RDD, причем можно выбрать, куда сохранять. Вариантов сохранения много. Самый оптимальный — в память (есть memory only, а есть memory only 2 — с двумя бекапами). Можно даже написать свой custom storage и сказать, как это сохранять. Можно сохранять memory and disk — попытаться сохранить в память, но если у данного worker (у машины, которая этот RDD запускает) нет достаточного объема оперативной памяти, часть запишется в память, а остатки сбросятся на диск. Вы можете сохранять данные как объект или делать сериализацию. У каждого из вариантов есть свои плюсы и минусы, но такая возможность есть, и это прекрасно.
Мы победили эту проблему. Persist — это не action. Если не будет никаких action, persist не сделает ничего. Когда запустится первый action, вся цепочка прогоняется и в конце первой части цепочки RDD персистится на все машины, где находится data. Когда мы запускаем action RDD6, начинаем уже с persist (если бы были другие ответвления, то продолжали бы с точки, которую «запомнили» или «пометили» persist).
Миф 2. Spark пишем только на Scala
Spark — здорово, его можно применять даже для каких-то локальных нужд, не обязательно для big data. Можно просто использовать его API для обработки данных (он реально удобный). Возникает вопрос: на чем писать? Python и R я отмел сразу. Будем выяснять: Scala или Java?
Что думает обычный Java-девелопер о Scala?
Продвинутый Java-девелопер видит чуть больше. Он знает, что там есть какой-то play, какие-то классные фреймворки, лямбды и очень много китайского.
Помните попу? Вот она. Так выглядит код на Scala.
Я не буду вдаваться сейчас в API Scala, потому что моя конечная цель — убедить вас, что писать на Java ничуть не хуже, но этот код считает длину каждой строки и суммирует все это дело.
Очень сильный аргумент против Java в том, что тот же самый код на Java выглядит вот так:
Когда я начинал первый проект, начальство спрашивало, уверен ли я? Ведь когда мы будем писать, кода будет все больше и больше. Но это все ложь. Сегодняшний код выглядит так:
Вы видите сильную разницу между Scala и Java 8? Мне кажется, для Java-программистов это более читабельно. Но даже несмотря на Java 8, мы приходим к мифу, что Spark надо писать на Scala. Чем люди, которые знают, что в Java 8 все не так плохо, аргументируют, что надо писать на Scala?
Послушайте подкаст — Выпуск 104 — в котором обсуждают, что произошло.
Я в двух словах расскажу.
Год назад Martin Odersky, который в 2010 году открыл компанию Typesafe, закрыл ее. Нет больше компании Typesafe, которая поддерживает Scala.
Это не значит, что Scala умерла, поскольку вместо Typesafe открылась другая компания — Lightbend, но у нее совершенно другая бизнес-модель. Они пришли к выводу, что даже благодаря классным вещам, написанным на Scala, как Play, Akka и Spark, и даже благодаря упомянутой выше попе, невозможно заставить массы перейти работать на Scala. Год назад Scala находилась на пике популярности, несмотря на это она не входила даже в первые 40 мест в рейтинге. Для сравнения — Groovy был на двадцатом, Java — на первом.
Когда они поняли, что даже на пике популярности все равно не заставили людей использовать Scala в массах, то признали свою бизнес-модель неправильной. У компании, которая сегодня будет пилить Scala, другая бизнес модель. Они говорят, что все продукты, которые будут делаться для масс, вроде Spark, будут иметь отличный Java API. И когда мы дойдем до датафреймов, вы увидите, что там уже нет никакой разницы, писать на Scala или на Java.
Миф 3. Spark и Spring несовместимы
Во-первых, я вам уже показал, что у меня есть SparkContext, который прописан как bean. Далее мы увидим, как при bean постпроцессора мы сможем поддерживать некоторый функционал для Spark.
Давайте уже писать код.
Мы хотим написать сервис (вспомогательный), который принимает RDD строк и количество топовых слов. Его задача — вернуть топовые слова. Давайте посмотрим в коде, что мы делаем.
Во-первых, поскольку мы не знаем, в lowercase или в uppercase у нас слова песен, нам надо все перевести в lowercase, чтобы слова с большой и с маленькой буквы мы не считали два раза. Поэтому мы пользуемся функцией map. После этого нужно превратить строки в слова с помощью функции flatmap.
Теперь у нас есть RDD, в котором присутствуют слова. Мы его map-ируем против их количества. Но сначала надо просто каждому слову единичку приписать. Это будет классический паттерн: у нас будет слово — 1, слово — 1, потом все еденички против одинаковых слов надо будет суммировать и отсортировать (все работает в памяти, и никакие промежуточные результаты не сохраняются на диске если памяти достаточно).
У нас есть функция mapToPair — сейчас мы уже будем создавать пары. Проблема в том, что в Java нет класса Pair. На самом деле это большое упущение, потому что очень часто у нас есть какая-то информация, которую в определенном контексте хочется соединить, но писать под это класс глупо.
У Скалы есть готовые классы (их очень много) — Tuple. Есть Tuple2, 3, 4 и т.п. до 22. Почему до 22? Не знает никто. Нам нужен Tuple2, потому что мы мапируем 2.
Теперь все это надо reduce-ить. У нас есть метод reduceByKey, который все одинаковые слова оставит ключом, а со всеми value сделает то, что попрошу. Нам надо сложить. У нас получились пары: слово — количество.
Теперь надо отсортировать. Тут у нас опять небольшая проблема с Java, т.к. единственное, что у нас есть sort — это sorkByKey. В API Scala есть просто sortby и там вы берете этот Tuple и вытаскиваете из него все, что хотите. А здесь — только SortByKey.
Как я и говорил, пока еще в некоторых местах мы чувствуем, что Java API недостаточно богат. Но выкрутиться можно. К примеру, можно перевернуть нашу пару. Для этого мы еще раз делаем mapToPair, и у Tuple есть встроенная функция swap (получилась пара количество — слова). Теперь мы можем делать sortByKey.
После этого надо вытащить не первую, а вторую часть. Поэтому делаем map. Для вытаскивания второй части у Tuple есть готовая функция «_2». Теперь делаем Take(x) (нам же нужно только x слов — метод называется TopX), и этому всему можно будет сделать return.
Я покажу, как делается тест. Но до этого посмотрите, что у меня в Java config на Spring (мы работаем на Spring, и это не просто класс, а сервис).
В Java config я читаю какой-то user.properties (я потом объясню, зачем; сейчас я его все равно не использую). Также я сканирую все классы и прописываю два bean: PropertySourcePlceholderConfigurer — чтобы можно было инжектить что-то из property-файлов, это пока не актуально; и единственный bean, который нас сейчас интересует — это обычный JavaSparkContext.
Я создал SparkConf, настроил его (программа называется music analyst), сказал ему, что у нас мастер (мы работаем локально). Мы создали JavaSparkContext — все замечательно.
Теперь смотрите тест.
Поскольку мы работаем со Spring, раннер, естественно, спринговый. Наша конфигурация — это AppConfig (правильно было бы сделать разные конфигурации для тестирования и для продакшн). Далее мы инжектим сюда JavaSparkContext и тот сервис, который хотим проверять. При помощи SparkContext я пользуюсь методом parallelize и передаю туда строку «java java java scala grovy grovy». Далее запускаю метод и проверяю, что Java — это самое популярное слово.
Тест упал. Потому что самое популярное — scala.
Что я забыл сделать? Когда я делал Sort, надо было сортировать в другую сторону.
Исправляем в нашем сервисе:
Теперь попробуем запустить main и посмотреть результат на реальной песне. У меня есть директория data, там есть папка Beatles, в которой лежит текст единственной песни: yesterday. Как вы думаете, какое самое популярное слово в yesterday?
Здесь у меня сервис ArtistsJudge. Мы имплементировали метод TopX — он принимает имя артиста, добавляет директорию, в которой находятся песни этого артиста, а дальше использует метод topX уже написанного сервиса.
Main у меня выглядит так:
Итак, самое популярное слово — это не yesterday, это «i»:
Согласитесь, это не очень хорошо. У нас есть мусорные слова, которые не несут смысловой нагрузки (в конечном итоге мы хотим анализировать, насколько песни Pink Floyd более глубокие и нам такие слова будут сильно мешать).
Поэтому у меня был файл userProperties, в котором определены мусорные слова:
Можно было бы сразу инжектить этот garbage в наш сервис, но я так делать не люблю. У нас есть UserConfig, который будет передаваться в разные сервисы. Каждый будет вытаскивать из него то, что ему нужно.
Обратите внимание, я использую private для сеттера и public для самого property. Но не будем на этом зацикливаться.
Мы идем в наш PopularWordsServiceImpl, делаем Autowired этому UserConfig и фильтруем все слова.
Запускаем тот же main.
Смотрите, что у нас произошло (это важный момент):
Все упало, потому что not serializable. Давайте об этом поговорим. Чтобы вы не сомневались, UserConfig — serializable.
Но у меня не serializable мой PopularWordsServiceImpl:
Сейчас я его сделаю serializable:
Когда вы в map-функции (или любой другой функции, которая бежит на кластере) начинаете пользоваться state-ом какого-то объекта, этот объект должен на кластер уходить и сериализоваться. Т.е. если я использую UserConfig внутри своих функций, он должен быть serializable. Но фишка в том, что этот UserConfig является частью моего сервиса, значит сервис тоже должен сериализоваться. Это можно хитро обойти, но проще сделать serializable.
В итоге все работает. На первое место попал yesterday. На втором месте — oh, на третьем — believe. Я специально не вносил oh в слова-мусор, потому что для Бритни Спирс это очень важное слово.
Но что происходит, когда я говорю, что UserConfig должен пойти на все worker? Он будет для каждой строчки туда идти? Не будет ли это бить по перформансу? Тут мы снова возвращаемся к тому, что писать код на Spark легко, а вот чтобы писать эффективный код, надо кое-что знать.
Давайте поговорим о следующем мифе, который связан с broadcast-ом.
Миф 4. Есть случаи, когда без broadcast не работает
Вам надо позаботиться о том, чтобы на worker-ы попала общая для них data (как UserConfig в примере выше). Я встречал людей, которые говорили, что обязательно надо делать broadcast, иначе это не будет работать. Но это будет работать (как вы видели), broadcast делать не обязательно.
Есть 2 варианта, как это реализовать:
Это сокращенный пример из одного из рабочих проектов.
У меня есть строки в файле (там их миллиарды), где указаны страна и номер телефона. Мне надо отфильтровать те страны, которые нас не интересуют. Какие именно — прописано в конфиге в каком-то property-файле и его надо дистрибьютить на все worker-ы. После этого мне опять же из конфига надо взять информацию о префиксах телефона и смапировать это против оператора, чтобы получить названия телефонных компаний, обслуживающих номера:
У меня есть какой-то Excel-файл, который говорит, что 054 — это Orange, а 911 — это МТС. Это не такой уж большой файл (10 Кб; даже если бы он был 2 Мб — это было бы ничто по сравнению с big data) и его надо дистрибьютить.
В конечном итоге мы хотим остаться с этими телефонными компаниями:
Как у меня выглядит код?
Есть интерфейс CommonConfig, который умеет распознавать оператора по телефону и выяснять, какие страны нас интересуют.
Код выглядит примерно так же, как в ситуации выше с мусором:
Есть какой-то сервис на Spring, он в себя получает этот конфиг и пользуется им, чтобы сначала отфильтровать, а потом смапировать data.
Это неэффективно! Сейчас объясню, почему (и почему в данном случае надо делать broadcast).
Что на самом деле происходит? Конфиг будет транспортироваться с Driver на Worker-ы, конечно, не на каждую строчку, намного больше, чем 1 раз. Возьмем простой пример. Представьте, что у вас есть файл на несколько Тб и его надо обработать. У него есть логическое разделение на разделы и, допустим, это 1000 таких разделов. А когда я запускаю свой Spark-процесс, у меня есть только 10 Worker-ов.
Worker-ы будут как-то делить работу. Их 10, разделов 1000, каждый возьмет на себя 100 тасков. Каждый закачивает в себя первый кусок информации, запускает этот код, информацию хранит в памяти, берет следующий кусок и т.д. И на каждый таск он будет туда-сюда тягать конфиг (и не 1 раз, а 2, т.к. я им пользуюсь 2 раза). Поэтому практически всегда, когда информация нужна всем worker-ам и она не измеряется в джигабайтах, имеет смысл сделать broadcast.
В плане синтаксиса это выглядит вот так:
У объекта есть context, метод, который называется broadcast, и здесь вы передаете ту переменную, которую хотите broadcast-ить. Когда этот код исполнится, переменная пойдет на все worker-ы и будет там сидеть до конца процесса.
В чем проблема? Нарушается инверсия контроля:
Нам здесь нужен context (так, казалось бы, надо написать на Spring). Я хочу broadcast-ить в самом начале, поэтому у нас есть метод PostConstruct, который называется wrapWithBroadcast. При помощи SparkContext он броадкастит то, что ему нужно. Это происходит один раз в PostConstruct.
Далее мы вытаскиваем значения (так из broadcast можно вытащить то, что он держит):
Мы сюда инжектили SparkContext, он весь здесь сидит. Это нарушение инверсии контроля. Получается много copy-paste, поскольку логику, которую делает этот broadcast, необходимо будет переносить во все сервисы.
Вот это придется copy-pastИть везде.
Получается, что технический код Spark просачивается в бизнес-код (логика broadcast — это технический код). Это усложняет тесты, для каждого сервиса придется делать моки на SparkContext, который сидит внутри.
Есть и еще один аргумент:
Когда вы инжектили SparkContext в сервис, он перестал быть serializable.
Спор о том, надо это делать или нет, можно продолжать вечно, но в этом нет смысла. Поэтому можно попробовать сделать так:
Хочешь делать broadcast? Откажись от того, чтобы декларировать его как bean:
Добавь сюда переменную в виде broadcast-а, поставь сеттер и потом в своей конфигурации ты его будешь прописывать как bean и передавать ему через сеттер переменную, завернутую в broadcast.
Вы себе представляете, насколько это некрасиво, какому количеству сервисов нужен этот broadcast? Получается, вместо того, чтобы эти сервисы прописывать Service, придется их все описать в конфигурации, чтобы не давать broadcast.
Вместо этого надо пойти и сделать правильно.
Здесь я буду держать broadcast своего UserConfig и воспользуюсь аннотацией AutowiredBroadcast. Так красивее, правда?
Только теперь здесь:
надо будет делать UserConfig.value, и все отлично будет работать.
Понятно, что есть bean-постпроцессор, который поддерживает эту аннотацию.
Подведем итоги по сравнению синтаксиса.
Сверху то, что мы написали на Java (с учетом мусора и т.д.). Ниже — как это выглядело бы на Scala. Несмотря на то, что это Java 8, справа все равно в 2 раза короче. Кроме того, у нас есть всякие такие фишки:
В Java у нас целая функция GetWords, которая написана на полэкрана и для проверки использует регулярные выражения. А в Scala можно просто превратить кусок в регулярное выражение и искать в нем. Также в Scala есть SortBy, который пришлось заменить Tuple, при этом на Scala он более читаемый (это ascending false, а не непонятный false).
Чем будем крыть? У нас есть датафреймы.
DataFrames — замечательный API, который вышел чуть больше полутора лет назад со Spark 1.3. Он превращает все в таблицу, допускает обращение по названиям колонок (вместо использования Tuple). Под капотом там RDD, но т.к. этот RDD он генерит сам, внутри куча оптимизаций — это работает намного быстрее. Датафреймы требуют намного меньше памяти (если их умно использовать), потому что можно интересоваться только одной колонкой и для определенного task-а только она будет вычитываться.
Датафреймы можно создавать из:
Любители SQL могут реализовывать нечто подобное:
Можно взять датафрейм, зарегистрировать как таблицу и запускать на нее SQL при помощи sqlContext.
Правда, может получиться и так:
Поэтому правильнее пользоваться датафреймами и их функциями:
Разберем тот же пример, но на датафреймах. У нас есть файл с данными разных людей:
Здесь есть возраст и keywords (технологии, с которыми люди работают).
Сейчас я запущу main.
Во-первых, гораздо удобнее всю эту информацию получить в организованную структуру, которая называется датафрейм. Вы просто пишите sqlContext.read.json и указываете директорию (вы видели, там был не один json, а очень много — я их просто в один файл покидал; каждый json описывает информацию одного человека). После этого я делаю show. Первая таблица выглядит так:
Он мне сделал таблицу с колонками: возраст, keywords, имя. Но ключевые слова через запятую записаны. А у меня задача найти самую популярную технологию, а потом найти людей меньше 30 лет (потому что им можно меньше платить), которые в этой технологии разбираются.
Воспользуюсь для этого функциями датафреймов. Начальный датафрейм называется linkedIn. Я делаю select, и дальше меня интересует колонка keywords. Но я знаю, что в этой колонке массив через запятую, поэтому я делаю explode (и называю новую колонку keyword в единственном числе).
Строка возвращает другой датафрейм. Смотрите, как он выглядит:
По этой колонке я уже буду делать sort и т.д. Но здесь она пока с повторениями.
Теперь это надо сгруппировать. Я группирую по keyword:
Дальше я объясняю, что надо сделать в процессе группировки. Я хочу посчитать keyword-ы и назвать эту колонку amount. Потом я хочу это отсортировать по колонке amount в descended порядке (никаких false). И снова показываю:
Тут уже нет никаких повторений и возле каждого слова есть количество. После этого мы вытаскиваем самое популярное слово:
Мы берем просто first — он возвращает строчку, из этой строчки я вытаскиваю string (работаю как с resultset-ом). После этого я возвращаюсь к начальному датафрейму, где указаны все люди:
Выбираю людей с возрастом менее 30 лет и имеющих указанное ключевое слово. Для этого есть функция, которая называется functions.array_contains. После этого делаю show. Вот результат:
Есть еще один очень важный миф. Люди говорят: с XML-файлами, с JSON-файлами, с таблицами все прекрасно. А когда просто слова надо посчитать (файл не имеет структуры)? Нельзя же датафреймы использовать, и тогда Java снова проигрывает Scala? Это не так, поскольку можно создать структуру из чего угодно.
У меня для этого есть класс WordDataFrameCreator.
Смотрите, что я делаю:
Он принимает директорию. Вначале с обычным RDD он просто считывает все файлы и map-ирует по словам. А дальше у нас есть класс, который называется RowFactory — я хочу построить строчки из этого RDD. Возвращается у меня пока RDD, но это уже RDD строчек, из которого можно построить датафрейм, если рассказать, какая должна быть структура, т.е. какие колонки, как их назвать и какой там тип — я это делаю далее. Опять же используется SqlContext.
Кстати, строится SqlContext из JavaSparkContext обычным способом (если мы посмотрим AppConfig, то у меня SqlContext занимает буквально одну строчку). Вот, как он строится:
При помощи SqlContext я создаю датафрейм, передаю ему RDD, где есть все данные, и рассказываю, как выглядит структура — передаю массив колонок (пока есть только одна колонка, которая называется words, имеет тип string и является обязательной — true).
После создания этой штуки можно пользоваться API датафреймов: добавлять и убирать колонки, проводить с ними какие-то манипуляции.
В анализаторе песен это будет выглядеть вот так:
Теперь сервис, который считает популярные слова, работает не с RDD, а с датафреймами. API совсем другой.
Сначала мы переводим в lower case все слова. withColumn — это метод, который добавляет колонку. Если ее название такое же, как было, одна колонка просто заменяется другой. Далее фильтруем мусорные слова, группируем и добавляем колонку count с количеством слов, а затем — сортируем в descended-порядке. Далее показываем сколько-то элементов.
Выше я пользуюсь встроенными функциями. Можно ли сделать свою функцию, помимо встроенных? Не вопрос. Например, я пишу свою custom-функцию, которая будет говорить, мусор это или нет.
Сustom-функции (они также называются udf) надо зарегистрировать — заявить, как она называется. Потом ее в любой момент можно будет дергать по названию. Поэтому я своей функции даю название notGarbage. Мой класс, который имплементирует интерфейс udf1, принимает на входе string (слово), а на выходе — boolean (мусор или не мусор).
Смотрите, как я использую свою функцию:
Сервис, который принимает на вход датафрейм, сначала в своем PostConstruct просто регистрирует ее.
Далее я делаю callUDF и даю то же самое имя — вызывается моя функция. Вы можете писать свои функции — тогда у нас не будет ситуации, что контекст где-то сидит и сериализуется. Будет сериализоваться только udf-функция.
Конечно этот процесс регистрации UDF функий выглядит утомительным, но я надеюсь, что вы уже догадываетесь, что я могу придмать свою аннотацию @RegisterUDF и написать BPP который будет все эти функции регистрировать сам.
Давайте все же запустим анализ текстов песен и посмотрим, что у нас получается (у меня в этом проекте уже есть Tomcat, есть контроллер):
Вот 10 самых популярных слов Кэтти Пэри:
Если сравнить Бритни Спирс с Кэтти Пэри (мы это будем делать так):
Это простой сервис, которому указываем двух артистов и количество слов для сравнения. У Брити Спирс и Кэтти Пэри из 6 слов 4 общие.
А у Pink Floyd 0 общих слов с Бритни Спирс. Вот, что у нас здесь было:
Выводы
Лично встретиться с Женей в Москве можно будет уже совсем скоро:
— 5 апреля на Spark-тренинге. Тренинг будет похож на этот доклад, конечно, сильно дополненный и расширенный, но тем не менее. Так что если вы готовы копать Spark и разбираться с нюансами самостоятельно, он вам не сильно-то и нужен. А вот если решите, что хочется всего и сразу: и проектик за день написать, и все вопросы лично задать, и разобраться быстро и легко, — добро пожаловать на тренинг «Добро пожаловать в Spark!».
— 7-8 апреля на JPoint 2017. В этот раз он выступит с двумя докладами: «Spring – Глубоко и не очень» и «Проклятие Spring Test». После каждого из докладов в дискуссионной зоне можно будет устраивать допросы с пристрастием обо всем, что вам интересно!
Кроме этого, на JPoint есть целая куча крутых докладов практически обо всем из мира Java — обзор планируемых докладов мы давали в другом посте, а просто программу конференции вы найдете на сайте мероприятия.