commit 915850c1b01340afcf627988fcb127b7dcdd46ee Author: Stas Kim Date: Thu Feb 1 12:21:34 2024 +0300 Добавлено первое решение задания diff --git a/.bsp/sbt.json b/.bsp/sbt.json new file mode 100644 index 0000000..ec282b7 --- /dev/null +++ b/.bsp/sbt.json @@ -0,0 +1 @@ +{"name":"sbt","version":"1.9.8","bspVersion":"2.1.0-M1","languages":["scala"],"argv":["C:\\Program Files\\Eclipse Adoptium\\jdk-8.0.392.8-hotspot\\jre/bin/java","-Xms100m","-Xmx100m","-classpath","C:/Users/11/AppData/Roaming/JetBrains/IntelliJIdea2021.3/plugins/Scala/launcher/sbt-launch.jar","xsbt.boot.Boot","-bsp","--sbt-launch-jar=C:/Users/11/AppData/Roaming/JetBrains/IntelliJIdea2021.3/plugins/Scala/launcher/sbt-launch.jar"]} \ No newline at end of file diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..e5745f1 --- /dev/null +++ b/build.sbt @@ -0,0 +1,13 @@ +ThisBuild / version := "0.1.0-SNAPSHOT" +// https://mvnrepository.com/artifact/org.apache.spark/spark-core +libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.0" +// https://mvnrepository.com/artifact/org.apache.spark/spark-sql +libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.0" % "provided" +// https://mvnrepository.com/artifact/org.postgresql/postgresql +libraryDependencies += "org.postgresql" % "postgresql" % "42.7.1" +ThisBuild / scalaVersion := "2.13.12" + +lazy val root = (project in file(".")) + .settings( + name := "TECHDM_TRANSACTION" + ) diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..0aa5c39 --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version = 1.9.8 diff --git a/src/main/resources/data/acct/_SUCCESS b/src/main/resources/data/acct/_SUCCESS new file mode 100644 index 0000000..e69de29 diff --git a/src/main/resources/data/acct/part-00000-6883a1c2-b42b-437c-96d5-15762033328a-c000.snappy.parquet b/src/main/resources/data/acct/part-00000-6883a1c2-b42b-437c-96d5-15762033328a-c000.snappy.parquet new file mode 100644 index 0000000..1c4ce75 Binary files /dev/null and b/src/main/resources/data/acct/part-00000-6883a1c2-b42b-437c-96d5-15762033328a-c000.snappy.parquet differ diff --git a/src/main/resources/data/acct_h/_SUCCESS b/src/main/resources/data/acct_h/_SUCCESS new file mode 100644 index 0000000..e69de29 diff --git a/src/main/resources/data/acct_h/part-00000-5ecc49e3-ce9c-4f36-a62f-0570f3c29630-c000.snappy.parquet b/src/main/resources/data/acct_h/part-00000-5ecc49e3-ce9c-4f36-a62f-0570f3c29630-c000.snappy.parquet new file mode 100644 index 0000000..7f241ce Binary files /dev/null and b/src/main/resources/data/acct_h/part-00000-5ecc49e3-ce9c-4f36-a62f-0570f3c29630-c000.snappy.parquet differ diff --git a/src/main/resources/data/crncy/_SUCCESS b/src/main/resources/data/crncy/_SUCCESS new file mode 100644 index 0000000..e69de29 diff --git a/src/main/resources/data/crncy/part-00000-f9e00d5e-88f1-4f9e-b5f8-8077e9202152-c000.snappy.parquet b/src/main/resources/data/crncy/part-00000-f9e00d5e-88f1-4f9e-b5f8-8077e9202152-c000.snappy.parquet new file mode 100644 index 0000000..9f1340b Binary files /dev/null and b/src/main/resources/data/crncy/part-00000-f9e00d5e-88f1-4f9e-b5f8-8077e9202152-c000.snappy.parquet differ diff --git a/src/main/resources/data/div_/_SUCCESS b/src/main/resources/data/div_/_SUCCESS new file mode 100644 index 0000000..e69de29 diff --git a/src/main/resources/data/div_/part-00000-711b71d0-68d4-40b5-b6c4-76050a546c2e-c000.snappy.parquet b/src/main/resources/data/div_/part-00000-711b71d0-68d4-40b5-b6c4-76050a546c2e-c000.snappy.parquet new file mode 100644 index 0000000..8fac40c Binary files /dev/null and b/src/main/resources/data/div_/part-00000-711b71d0-68d4-40b5-b6c4-76050a546c2e-c000.snappy.parquet differ diff --git a/src/main/resources/data/frsymb/_SUCCESS b/src/main/resources/data/frsymb/_SUCCESS new file mode 100644 index 0000000..e69de29 diff --git a/src/main/resources/data/frsymb/part-00000-96618fb3-d593-4e78-a1e1-2eb2ca8d1102-c000.snappy.parquet b/src/main/resources/data/frsymb/part-00000-96618fb3-d593-4e78-a1e1-2eb2ca8d1102-c000.snappy.parquet new file mode 100644 index 0000000..e816f4e Binary files /dev/null and b/src/main/resources/data/frsymb/part-00000-96618fb3-d593-4e78-a1e1-2eb2ca8d1102-c000.snappy.parquet differ diff --git a/src/main/resources/data/paydoc/_SUCCESS b/src/main/resources/data/paydoc/_SUCCESS new file mode 100644 index 0000000..e69de29 diff --git a/src/main/resources/data/paydoc/part-00000-179f9159-0a68-4b76-a8b3-9a205e9b4569-c000.snappy.parquet b/src/main/resources/data/paydoc/part-00000-179f9159-0a68-4b76-a8b3-9a205e9b4569-c000.snappy.parquet new file mode 100644 index 0000000..c1c4033 Binary files /dev/null and b/src/main/resources/data/paydoc/part-00000-179f9159-0a68-4b76-a8b3-9a205e9b4569-c000.snappy.parquet differ diff --git a/src/main/resources/data/txn/_SUCCESS b/src/main/resources/data/txn/_SUCCESS new file mode 100644 index 0000000..e69de29 diff --git a/src/main/resources/data/txn/part-00000-02fff546-1c15-4c28-821e-59e0de92feda-c000.snappy.parquet b/src/main/resources/data/txn/part-00000-02fff546-1c15-4c28-821e-59e0de92feda-c000.snappy.parquet new file mode 100644 index 0000000..10a1a8c Binary files /dev/null and b/src/main/resources/data/txn/part-00000-02fff546-1c15-4c28-821e-59e0de92feda-c000.snappy.parquet differ diff --git a/src/main/scala/task/task1.scala b/src/main/scala/task/task1.scala new file mode 100644 index 0000000..22dabf2 --- /dev/null +++ b/src/main/scala/task/task1.scala @@ -0,0 +1,165 @@ +package task + +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ +import org.apache.spark.sql._ + +import java.util.Properties + +object task1 extends App { + + def formatOfr(value: String, account: String): String = { + def formatSubstring(str: String): String = + if (str.endsWith("00")) str.substring(0, 5) + else s"${str.substring(0, 5)}.${str.substring(5, 7)}" + + Option(value).collect { + case v if v.length == 7 => formatSubstring(v) + case v if v.length == 5 || v.contains('.') => v + }.getOrElse(formatSubstring(account.substring(13))) + } + + val formatOfrUDF = udf(formatOfr _) + + def processDate(date: String, dtAccount: String, ktAccount: String): String = { + if (dtAccount.startsWith("707") || ktAccount.startsWith("707")) { + val originalDate = java.time.LocalDate.parse(date) + val adjustedDate = originalDate.minusYears(1) + adjustedDate.toString + } else date + } + + val processDateUDF = udf(processDate _) + + def makeTechdmTransaction(dataProductInput: String, startDate: Option[Column] = None, endDate: Option[Column] = None) = { + val spark = SparkSession.builder.master("local").appName("Task1").getOrCreate() + + val acct = spark.read.parquet("src/main/resources/data/acct/part-00000-6883a1c2-b42b-437c-96d5-15762033328a-c000.snappy.parquet") + val acctH = spark.read.parquet("src/main/resources/data/acct_h/part-00000-5ecc49e3-ce9c-4f36-a62f-0570f3c29630-c000.snappy.parquet") + val crncy = spark.read.parquet("src/main/resources/data/crncy/part-00000-f9e00d5e-88f1-4f9e-b5f8-8077e9202152-c000.snappy.parquet") + val div = spark.read.parquet("src/main/resources/data/div_/part-00000-711b71d0-68d4-40b5-b6c4-76050a546c2e-c000.snappy.parquet") + val frsymb = spark.read.parquet("src/main/resources/data/frsymb/part-00000-96618fb3-d593-4e78-a1e1-2eb2ca8d1102-c000.snappy.parquet") + val paydoc = spark.read.parquet("src/main/resources/data/paydoc/part-00000-179f9159-0a68-4b76-a8b3-9a205e9b4569-c000.snappy.parquet") + val txn = spark.read.parquet("src/main/resources/data/txn/part-00000-02fff546-1c15-4c28-821e-59e0de92feda-c000.snappy.parquet") + + val pgConnectionProperties = new Properties() + pgConnectionProperties.put("user", "postgres") + pgConnectionProperties.put("password", "12345678") + val dbString = "jdbc:postgresql://localhost:5432/task1" + + val dataProduct = spark.read.jdbc(dbString, "public.data_product", pgConnectionProperties) + val techdmSettingTransactionFilter = spark.read.jdbc(dbString, "public.techdm_setting_transaction_filter", pgConnectionProperties) + + val productRow = dataProduct.filter(dataProduct("dp_code") === dataProductInput).collect()(0) + val asnuCodeFk = productRow.getAs[String]("asnu_code_fk") + val transactionDepth = productRow.getAs[Int]("transaction_depth_update") + + def checkIfDatesAreSet(start_dt: Option[Column], end_dt: Option[Column]): Boolean = start_dt.isDefined && end_dt.isDefined + + val tmpMaxDate = lit(paydoc.select(max(paydoc("input_dttm"))).first().getAs[TimestampType](0)) + + val currentDate = current_timestamp() + + val finalStartDate: Column = if (checkIfDatesAreSet(startDate, endDate)) startDate.get + else if (tmpMaxDate == null) date_sub(currentDate, transactionDepth) + else least(date_sub(tmpMaxDate, transactionDepth), date_sub(currentDate, transactionDepth)) + + val finalEndDate = if (checkIfDatesAreSet(startDate, endDate)) endDate.get else date_sub(currentDate, 1) + + val transactionFilters = techdmSettingTransactionFilter.filter(techdmSettingTransactionFilter("dp_code_fk") === dataProductInput) + val dtFilter = transactionFilters.select(col("dt_account_mask_list")) + .filter(col("dt_account_mask_list").isNotNull) + .collect + .map(_.getString(0)) + .flatMap(_.split(",\\s")) + .map(mask => col("num").like(mask)) + .reduce(_ or _) + val ktFilter = transactionFilters.select(col("kt_account_mask_list")) + .filter(col("kt_account_mask_list").isNotNull) + .collect + .map(_.getString(0)) + .flatMap(_.split(",\\s")) + .map(mask => col("num").like(mask)) + .reduce(_ or _) + + //Отбор данных + val filteredPaydoc = paydoc.filter(finalStartDate <= paydoc("input_dttm") && paydoc("input_dttm") <= finalEndDate && paydoc("paydoc_dp_code") === dataProductInput) + + //полупроводки по дт и кт + val dtPaydoc = filteredPaydoc.join(txn, filteredPaydoc("paydoc_sid") === txn("paydoc_sid") && filteredPaydoc("paydoc_dp_code") === txn("paydoc_dp_code") && txn("cred_flag") === false).drop(txn("input_dttm")) + val ktPaydoc = filteredPaydoc.join(txn, filteredPaydoc("paydoc_sid") === txn("paydoc_sid") && filteredPaydoc("paydoc_dp_code") === txn("paydoc_dp_code") && txn("cred_flag") === true).drop(txn("input_dttm")) + + //данные по счетам + val dtAccount = dtPaydoc.join(acct, acct("acct_sk") === txn("acct_sk") && acct("acct_dp_code") === txn("acct_dp_code")).drop(acct("acct_sk"), txn("acct_dp_code")).filter(dtFilter) + val ktAccount = ktPaydoc.join(acct, acct("acct_sk") === txn("acct_sk") && acct("acct_dp_code") === txn("acct_dp_code")).drop(acct("acct_sk"), txn("acct_dp_code")).filter(ktFilter) + + //данные по валюте + val dtCur = dtAccount.join(crncy, crncy("crncy_sk") === dtAccount("crncy_sk") && crncy("crncy_dp_code") === dtAccount("crncy_dp_code")) + val ktCur = ktAccount.join(crncy, crncy("crncy_sk") === ktAccount("crncy_sk") && crncy("crncy_dp_code") === ktAccount("crncy_dp_code")) + + //филиал + val dtFilial = dtCur.join(acctH, acctH("acct_sk") === dtCur("acct_sk") && acctH("acct_dp_code") === dtCur("acct_dp_code") && acctH("end_dt") >= dtCur("input_dttm"), "left") + val ktFilial = ktCur.join(acctH, acctH("acct_sk") === ktCur("acct_sk") && acctH("acct_dp_code") === ktCur("acct_dp_code") && acctH("end_dt") >= ktCur("input_dttm"), "left") + + val dtFilialWithDiv = dtFilial.join(div, div("div_sk") === dtFilial("bal_div_sk") && div("div_dp_code") === dtFilial("bal_div_dp_code"), "left") + val ktFilialWithDiv = ktFilial.join(div, div("div_sk") === ktFilial("bal_div_sk") && div("div_dp_code") === ktFilial("bal_div_dp_code"), "left") + + //Символы ОФР + val dtFrSymb = dtFilialWithDiv.join(frsymb, frsymb("frsymb_sk") === dtFilialWithDiv("frsymb_sk") && frsymb("frsymb_dp_code") === dtFilialWithDiv("frsymb_dp_code"), "left") + val ktFrSymb = ktFilialWithDiv.join(frsymb, frsymb("frsymb_sk") === ktFilialWithDiv("frsymb_sk") && frsymb("frsymb_dp_code") === ktFilialWithDiv("frsymb_dp_code"), "left") + + //сборка дт и кт + val dtFull = dtFrSymb + .withColumn("asnu_code_fk", lit(asnuCodeFk)) + .withColumn("dp_code_fk", lit(dataProductInput)) + .select(filteredPaydoc("paydoc_sid") as "transaction_ek", + filteredPaydoc("input_dttm") as "transaction_date", + to_date(filteredPaydoc("input_dttm")) as "input_dt", + to_date(filteredPaydoc("bal_dttm")) as "value_date", + filteredPaydoc("doc_num") as "mem_order_number", + filteredPaydoc("doc_dt") as "mem_order_date", + acct("sid") as "dt_account_fk", + acct("acct_dp_code") as "dt_account_dp_code", + acct("num") as "dt_account", + crncy("sid") as "dt_currency_fk", + crncy("crncy_dp_code") as "dt_currency_dp_code", + crncy("iso_code") as "dt_currency_alfa_code", + crncy("digital_code") as "dt_currency_digital_code", + txn("sum_act_amt") as "dt_amount", + txn("sum_src_bal_amt") as "amount_rub", + div("sid") as "dt_filial_fk", + div("div_dp_code") as "dt_filial_dp_code", + div("code") as "dt_filial_code", + formatOfrUDF(frsymb("num"), acct("num")) as "dt_ofr", + filteredPaydoc("doc_purp_txt") as "payment_purpose", + filteredPaydoc("ctl_validfrom") as "ctl_validfrom_ts" + ) + val ktFull = ktFrSymb + .withColumn("asnu_code_fk", lit(asnuCodeFk)) + .withColumn("dp_code_fk", lit(dataProductInput)) + .select(filteredPaydoc("paydoc_sid") as "transaction_ek", + acct("sid") as "kt_account_fk", + acct("acct_dp_code") as "kt_account_dp_code", + acct("num") as "kt_account", + crncy("sid") as "kt_currency_fk", + crncy("crncy_dp_code") as "kt_currency_dp_code", + crncy("iso_code") as "kt_currency_alfa_code", + crncy("digital_code") as "kt_currency_digital_code", + txn("sum_act_amt") as "kt_amount", + div("sid") as "kt_filial_fk", + div("div_dp_code") as "kt_filial_dp_code", + div("code") as "kt_filial_code", + formatOfrUDF(frsymb("num"), acct("num")) as "kt_ofr" + ) + //итоговый dataframe + val techdmSrcTransaction = dtFull.join(ktFull, dtFull("transaction_ek") === ktFull("transaction_ek")).drop(ktFull("transaction_ek")) + .withColumn("operation_date", to_date(processDateUDF(col("value_date"), col("dt_account"), col("kt_account")) as "dt_operation_date")) + techdmSrcTransaction.show() + //techdmSrcTransaction.write.mode("overwrite").save("src/main/resources/data/result") + techdmSrcTransaction.write.mode("overwrite").jdbc(dbString, "techdm_transaction", pgConnectionProperties) + spark.stop() + } + + + makeTechdmTransaction("ndl_eks_gl", Option(to_date(lit("2015-03-06"))), Option(to_date(lit("2024-03-06")))) +}