Programing

Spark 코드 구성 및 모범 사례

lottogame 2020. 11. 19. 07:43
반응형

Spark 코드 구성 및 모범 사례


따라서 코드 재사용, 디자인 패턴 및 모범 사례를 항상 고려하는 객체 지향 세계에서 수년을 보내면서 Spark의 세계에서 코드 구성 및 코드 재사용에 다소 어려움을 겪고 있습니다.

재사용 가능한 방식으로 코드를 작성하려고하면 거의 항상 성능 비용이 발생하고 특정 사용 사례에 가장 적합한 것으로 다시 작성하게됩니다. "이 특정 사용 사례에 가장 적합한 것을 작성"하는이 상수는 코드 구성에도 영향을줍니다. 왜냐하면 "모든 것이 실제로 함께 속할"때 코드를 다른 객체 나 모듈로 분할하는 것이 어렵 기 때문에 long을 포함하는 "God"객체가 거의 없기 때문입니다. 복잡한 변형의 사슬. 사실, 객체 지향 세계에서 작업 할 때 지금 작성하고있는 대부분의 Spark 코드를 살펴 보았다면 "스파게티 코드"라고 생각하고 무시했을 것입니다.

나는 객체 지향 세계의 모범 사례와 동등한 것을 찾으려고 인터넷을 서핑했지만 많은 운이 없었습니다. 함수형 프로그래밍에 대한 "모범 사례"를 찾을 수 있지만 Spark는 여기에서 성능이 매우 중요한 요소이기 때문에 추가 레이어를 추가합니다.

제 질문은 여러분이 추천 할 수있는 Spark 코드 작성에 대한 모범 사례를 발견 한 Spark 전문가가 있습니까?

편집하다

댓글에 쓰여진 것처럼, 나는 실제로 누군가 가이 문제 해결 하는 방법에 대한 답변을 게시 할 것이라고 기대하지 않았지만 ,이 커뮤니티의 누군가가 어딘가에 글이나 블로그 게시물을 작성한 Martin Fowler 유형을 발견했으면합니다. Spark 세계에서 코드 구성 문제를 해결하는 방법에 대해 설명합니다.

@DanielDarabos는 코드 구성과 성능이 충돌하는 상황을 예로 들어 보겠다고 제안했습니다. 일상적인 작업에서이 문제가 자주 발생하는 것을 알지만, 좋은 최소한의 예로 요약하는 것이 조금 어렵지만 시도해 보겠습니다.

객체 지향 세계에서 저는 Single Responsibility Principle의 열렬한 팬이므로 내 방법이 한 가지에만 책임이 있는지 확인합니다. 재사용 가능하고 쉽게 테스트 할 수 있습니다. 따라서 목록에있는 일부 숫자의 합계를 계산해야하고 (일부 기준과 일치) 동일한 숫자의 평균을 계산해야한다면 가장 확실하게 두 가지 방법, 즉 합계를 계산하는 방법과 평균을 계산했습니다. 이렇게 :

def main(implicit args: Array[String]): Unit = {
  val list = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5))

  println("Summed weights for DK = " + summedWeights(list, "DK")
  println("Averaged weights for DK = " + averagedWeights(list, "DK")
}

def summedWeights(list: List, country: String): Double = {
  list.filter(_._1 == country).map(_._2).sum
}

def averagedWeights(list: List, country: String): Double = {
  val filteredByCountry = list.filter(_._1 == country) 
  filteredByCountry.map(_._2).sum/ filteredByCountry.length
}

물론 Spark에서 SRP를 계속 존중할 수 있습니다.

def main(implicit args: Array[String]): Unit = {
  val df = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5)).toDF("country", "weight")

  println("Summed weights for DK = " + summedWeights(df, "DK")
  println("Averaged weights for DK = " + averagedWeights(df, "DK")
}


def avgWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country)
  val summedWeight = countrySpecific.agg(avg('weight))

  summedWeight.first().getDouble(0)
}

def summedWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country)
  val summedWeight = countrySpecific.agg(sum('weight))

  summedWeight.first().getDouble(0)
}

But because my df may contain billions of rows I would rather not have to perform the filter twice. In fact, performance is directly coupled to EMR cost, so I REALLY don't want that. To overcome it, I thus decide to violate SRP and simply put the two functions in one and make sure I call persist on the country-filtered DataFrame, like this:

def summedAndAveragedWeights(df: DataFrame, country: String, sqlContext: SQLContext): (Double, Double) = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country).persist(StorageLevel.MEMORY_AND_DISK_SER)
  val summedWeights = countrySpecific.agg(sum('weight)).first().getDouble(0)
  val averagedWeights = summedWeights / countrySpecific.count()

  (summedWeights, averagedWeights)
}

Now, this example if of course a huge simplification of what's encountered in real life. Here I could simply solve it by filtering and persisting df before handing it to the sum and avg functions (which would also be more SRP), but in real life there may be a number of intermediate calculations going on that are needed again and again. In other words, the filter function here is merely an attempt to make a simple example of something that will benefit from being persisted. In fact, I think calls to persist is a keyword here. Calling persist will vastly speed up my job, but the cost is that I have to tightly couple all code that depends on the persisted DataFrame - even if they are logically separate.


I think you can subscribe Apache Spark, databricks channel on youtube, listen more and know more, especially for the experiences and lessons from others.

here is some videos recommended:

and I've posted and still updating it on my github and blog:

hope this can help you ~

참고URL : https://stackoverflow.com/questions/32777014/spark-code-organization-and-best-practices

반응형