Добавлено первое решение задания

master
Stas Kim 2 years ago
commit 915850c1b0
  1. 1
      .bsp/sbt.json
  2. 13
      build.sbt
  3. 1
      project/build.properties
  4. 0
      src/main/resources/data/acct/_SUCCESS
  5. BIN
      src/main/resources/data/acct/part-00000-6883a1c2-b42b-437c-96d5-15762033328a-c000.snappy.parquet
  6. 0
      src/main/resources/data/acct_h/_SUCCESS
  7. BIN
      src/main/resources/data/acct_h/part-00000-5ecc49e3-ce9c-4f36-a62f-0570f3c29630-c000.snappy.parquet
  8. 0
      src/main/resources/data/crncy/_SUCCESS
  9. BIN
      src/main/resources/data/crncy/part-00000-f9e00d5e-88f1-4f9e-b5f8-8077e9202152-c000.snappy.parquet
  10. 0
      src/main/resources/data/div_/_SUCCESS
  11. BIN
      src/main/resources/data/div_/part-00000-711b71d0-68d4-40b5-b6c4-76050a546c2e-c000.snappy.parquet
  12. 0
      src/main/resources/data/frsymb/_SUCCESS
  13. BIN
      src/main/resources/data/frsymb/part-00000-96618fb3-d593-4e78-a1e1-2eb2ca8d1102-c000.snappy.parquet
  14. 0
      src/main/resources/data/paydoc/_SUCCESS
  15. BIN
      src/main/resources/data/paydoc/part-00000-179f9159-0a68-4b76-a8b3-9a205e9b4569-c000.snappy.parquet
  16. 0
      src/main/resources/data/txn/_SUCCESS
  17. BIN
      src/main/resources/data/txn/part-00000-02fff546-1c15-4c28-821e-59e0de92feda-c000.snappy.parquet
  18. 165
      src/main/scala/task/task1.scala

@ -0,0 +1 @@
{"name":"sbt","version":"1.9.8","bspVersion":"2.1.0-M1","languages":["scala"],"argv":["C:\\Program Files\\Eclipse Adoptium\\jdk-8.0.392.8-hotspot\\jre/bin/java","-Xms100m","-Xmx100m","-classpath","C:/Users/11/AppData/Roaming/JetBrains/IntelliJIdea2021.3/plugins/Scala/launcher/sbt-launch.jar","xsbt.boot.Boot","-bsp","--sbt-launch-jar=C:/Users/11/AppData/Roaming/JetBrains/IntelliJIdea2021.3/plugins/Scala/launcher/sbt-launch.jar"]}

@ -0,0 +1,13 @@
ThisBuild / version := "0.1.0-SNAPSHOT"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.0" % "provided"
// https://mvnrepository.com/artifact/org.postgresql/postgresql
libraryDependencies += "org.postgresql" % "postgresql" % "42.7.1"
ThisBuild / scalaVersion := "2.13.12"
lazy val root = (project in file("."))
.settings(
name := "TECHDM_TRANSACTION"
)

@ -0,0 +1 @@
sbt.version = 1.9.8

@ -0,0 +1,165 @@
package task
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import java.util.Properties
object task1 extends App {
def formatOfr(value: String, account: String): String = {
def formatSubstring(str: String): String =
if (str.endsWith("00")) str.substring(0, 5)
else s"${str.substring(0, 5)}.${str.substring(5, 7)}"
Option(value).collect {
case v if v.length == 7 => formatSubstring(v)
case v if v.length == 5 || v.contains('.') => v
}.getOrElse(formatSubstring(account.substring(13)))
}
val formatOfrUDF = udf(formatOfr _)
def processDate(date: String, dtAccount: String, ktAccount: String): String = {
if (dtAccount.startsWith("707") || ktAccount.startsWith("707")) {
val originalDate = java.time.LocalDate.parse(date)
val adjustedDate = originalDate.minusYears(1)
adjustedDate.toString
} else date
}
val processDateUDF = udf(processDate _)
def makeTechdmTransaction(dataProductInput: String, startDate: Option[Column] = None, endDate: Option[Column] = None) = {
val spark = SparkSession.builder.master("local").appName("Task1").getOrCreate()
val acct = spark.read.parquet("src/main/resources/data/acct/part-00000-6883a1c2-b42b-437c-96d5-15762033328a-c000.snappy.parquet")
val acctH = spark.read.parquet("src/main/resources/data/acct_h/part-00000-5ecc49e3-ce9c-4f36-a62f-0570f3c29630-c000.snappy.parquet")
val crncy = spark.read.parquet("src/main/resources/data/crncy/part-00000-f9e00d5e-88f1-4f9e-b5f8-8077e9202152-c000.snappy.parquet")
val div = spark.read.parquet("src/main/resources/data/div_/part-00000-711b71d0-68d4-40b5-b6c4-76050a546c2e-c000.snappy.parquet")
val frsymb = spark.read.parquet("src/main/resources/data/frsymb/part-00000-96618fb3-d593-4e78-a1e1-2eb2ca8d1102-c000.snappy.parquet")
val paydoc = spark.read.parquet("src/main/resources/data/paydoc/part-00000-179f9159-0a68-4b76-a8b3-9a205e9b4569-c000.snappy.parquet")
val txn = spark.read.parquet("src/main/resources/data/txn/part-00000-02fff546-1c15-4c28-821e-59e0de92feda-c000.snappy.parquet")
val pgConnectionProperties = new Properties()
pgConnectionProperties.put("user", "postgres")
pgConnectionProperties.put("password", "12345678")
val dbString = "jdbc:postgresql://localhost:5432/task1"
val dataProduct = spark.read.jdbc(dbString, "public.data_product", pgConnectionProperties)
val techdmSettingTransactionFilter = spark.read.jdbc(dbString, "public.techdm_setting_transaction_filter", pgConnectionProperties)
val productRow = dataProduct.filter(dataProduct("dp_code") === dataProductInput).collect()(0)
val asnuCodeFk = productRow.getAs[String]("asnu_code_fk")
val transactionDepth = productRow.getAs[Int]("transaction_depth_update")
def checkIfDatesAreSet(start_dt: Option[Column], end_dt: Option[Column]): Boolean = start_dt.isDefined && end_dt.isDefined
val tmpMaxDate = lit(paydoc.select(max(paydoc("input_dttm"))).first().getAs[TimestampType](0))
val currentDate = current_timestamp()
val finalStartDate: Column = if (checkIfDatesAreSet(startDate, endDate)) startDate.get
else if (tmpMaxDate == null) date_sub(currentDate, transactionDepth)
else least(date_sub(tmpMaxDate, transactionDepth), date_sub(currentDate, transactionDepth))
val finalEndDate = if (checkIfDatesAreSet(startDate, endDate)) endDate.get else date_sub(currentDate, 1)
val transactionFilters = techdmSettingTransactionFilter.filter(techdmSettingTransactionFilter("dp_code_fk") === dataProductInput)
val dtFilter = transactionFilters.select(col("dt_account_mask_list"))
.filter(col("dt_account_mask_list").isNotNull)
.collect
.map(_.getString(0))
.flatMap(_.split(",\\s"))
.map(mask => col("num").like(mask))
.reduce(_ or _)
val ktFilter = transactionFilters.select(col("kt_account_mask_list"))
.filter(col("kt_account_mask_list").isNotNull)
.collect
.map(_.getString(0))
.flatMap(_.split(",\\s"))
.map(mask => col("num").like(mask))
.reduce(_ or _)
//Отбор данных
val filteredPaydoc = paydoc.filter(finalStartDate <= paydoc("input_dttm") && paydoc("input_dttm") <= finalEndDate && paydoc("paydoc_dp_code") === dataProductInput)
//полупроводки по дт и кт
val dtPaydoc = filteredPaydoc.join(txn, filteredPaydoc("paydoc_sid") === txn("paydoc_sid") && filteredPaydoc("paydoc_dp_code") === txn("paydoc_dp_code") && txn("cred_flag") === false).drop(txn("input_dttm"))
val ktPaydoc = filteredPaydoc.join(txn, filteredPaydoc("paydoc_sid") === txn("paydoc_sid") && filteredPaydoc("paydoc_dp_code") === txn("paydoc_dp_code") && txn("cred_flag") === true).drop(txn("input_dttm"))
//данные по счетам
val dtAccount = dtPaydoc.join(acct, acct("acct_sk") === txn("acct_sk") && acct("acct_dp_code") === txn("acct_dp_code")).drop(acct("acct_sk"), txn("acct_dp_code")).filter(dtFilter)
val ktAccount = ktPaydoc.join(acct, acct("acct_sk") === txn("acct_sk") && acct("acct_dp_code") === txn("acct_dp_code")).drop(acct("acct_sk"), txn("acct_dp_code")).filter(ktFilter)
//данные по валюте
val dtCur = dtAccount.join(crncy, crncy("crncy_sk") === dtAccount("crncy_sk") && crncy("crncy_dp_code") === dtAccount("crncy_dp_code"))
val ktCur = ktAccount.join(crncy, crncy("crncy_sk") === ktAccount("crncy_sk") && crncy("crncy_dp_code") === ktAccount("crncy_dp_code"))
//филиал
val dtFilial = dtCur.join(acctH, acctH("acct_sk") === dtCur("acct_sk") && acctH("acct_dp_code") === dtCur("acct_dp_code") && acctH("end_dt") >= dtCur("input_dttm"), "left")
val ktFilial = ktCur.join(acctH, acctH("acct_sk") === ktCur("acct_sk") && acctH("acct_dp_code") === ktCur("acct_dp_code") && acctH("end_dt") >= ktCur("input_dttm"), "left")
val dtFilialWithDiv = dtFilial.join(div, div("div_sk") === dtFilial("bal_div_sk") && div("div_dp_code") === dtFilial("bal_div_dp_code"), "left")
val ktFilialWithDiv = ktFilial.join(div, div("div_sk") === ktFilial("bal_div_sk") && div("div_dp_code") === ktFilial("bal_div_dp_code"), "left")
//Символы ОФР
val dtFrSymb = dtFilialWithDiv.join(frsymb, frsymb("frsymb_sk") === dtFilialWithDiv("frsymb_sk") && frsymb("frsymb_dp_code") === dtFilialWithDiv("frsymb_dp_code"), "left")
val ktFrSymb = ktFilialWithDiv.join(frsymb, frsymb("frsymb_sk") === ktFilialWithDiv("frsymb_sk") && frsymb("frsymb_dp_code") === ktFilialWithDiv("frsymb_dp_code"), "left")
//сборка дт и кт
val dtFull = dtFrSymb
.withColumn("asnu_code_fk", lit(asnuCodeFk))
.withColumn("dp_code_fk", lit(dataProductInput))
.select(filteredPaydoc("paydoc_sid") as "transaction_ek",
filteredPaydoc("input_dttm") as "transaction_date",
to_date(filteredPaydoc("input_dttm")) as "input_dt",
to_date(filteredPaydoc("bal_dttm")) as "value_date",
filteredPaydoc("doc_num") as "mem_order_number",
filteredPaydoc("doc_dt") as "mem_order_date",
acct("sid") as "dt_account_fk",
acct("acct_dp_code") as "dt_account_dp_code",
acct("num") as "dt_account",
crncy("sid") as "dt_currency_fk",
crncy("crncy_dp_code") as "dt_currency_dp_code",
crncy("iso_code") as "dt_currency_alfa_code",
crncy("digital_code") as "dt_currency_digital_code",
txn("sum_act_amt") as "dt_amount",
txn("sum_src_bal_amt") as "amount_rub",
div("sid") as "dt_filial_fk",
div("div_dp_code") as "dt_filial_dp_code",
div("code") as "dt_filial_code",
formatOfrUDF(frsymb("num"), acct("num")) as "dt_ofr",
filteredPaydoc("doc_purp_txt") as "payment_purpose",
filteredPaydoc("ctl_validfrom") as "ctl_validfrom_ts"
)
val ktFull = ktFrSymb
.withColumn("asnu_code_fk", lit(asnuCodeFk))
.withColumn("dp_code_fk", lit(dataProductInput))
.select(filteredPaydoc("paydoc_sid") as "transaction_ek",
acct("sid") as "kt_account_fk",
acct("acct_dp_code") as "kt_account_dp_code",
acct("num") as "kt_account",
crncy("sid") as "kt_currency_fk",
crncy("crncy_dp_code") as "kt_currency_dp_code",
crncy("iso_code") as "kt_currency_alfa_code",
crncy("digital_code") as "kt_currency_digital_code",
txn("sum_act_amt") as "kt_amount",
div("sid") as "kt_filial_fk",
div("div_dp_code") as "kt_filial_dp_code",
div("code") as "kt_filial_code",
formatOfrUDF(frsymb("num"), acct("num")) as "kt_ofr"
)
//итоговый dataframe
val techdmSrcTransaction = dtFull.join(ktFull, dtFull("transaction_ek") === ktFull("transaction_ek")).drop(ktFull("transaction_ek"))
.withColumn("operation_date", to_date(processDateUDF(col("value_date"), col("dt_account"), col("kt_account")) as "dt_operation_date"))
techdmSrcTransaction.show()
//techdmSrcTransaction.write.mode("overwrite").save("src/main/resources/data/result")
techdmSrcTransaction.write.mode("overwrite").jdbc(dbString, "techdm_transaction", pgConnectionProperties)
spark.stop()
}
makeTechdmTransaction("ndl_eks_gl", Option(to_date(lit("2015-03-06"))), Option(to_date(lit("2024-03-06"))))
}
Loading…
Cancel
Save