Compare commits
No commits in common. 'master' and 'main' have entirely different histories.
@ -1 +0,0 @@ |
|||||||
{"name":"sbt","version":"1.9.8","bspVersion":"2.1.0-M1","languages":["scala"],"argv":["C:\\Program Files\\Eclipse Adoptium\\jdk-8.0.392.8-hotspot\\jre/bin/java","-Xms100m","-Xmx100m","-classpath","C:/Users/11/AppData/Roaming/JetBrains/IntelliJIdea2021.3/plugins/Scala/launcher/sbt-launch.jar","xsbt.boot.Boot","-bsp","--sbt-launch-jar=C:/Users/11/AppData/Roaming/JetBrains/IntelliJIdea2021.3/plugins/Scala/launcher/sbt-launch.jar"]} |
|
||||||
@ -0,0 +1,21 @@ |
|||||||
|
# ---> SBT |
||||||
|
# Simple Build Tool |
||||||
|
# http://www.scala-sbt.org/release/docs/Getting-Started/Directories.html#configuring-version-control |
||||||
|
|
||||||
|
dist/* |
||||||
|
target/ |
||||||
|
lib_managed/ |
||||||
|
src_managed/ |
||||||
|
project/boot/ |
||||||
|
project/plugins/project/ |
||||||
|
.history |
||||||
|
.cache |
||||||
|
.lib/ |
||||||
|
|
||||||
|
# ---> Scala |
||||||
|
*.class |
||||||
|
*.log |
||||||
|
|
||||||
|
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml |
||||||
|
hs_err_pid* |
||||||
|
|
||||||
@ -1,13 +0,0 @@ |
|||||||
ThisBuild / version := "0.1.0-SNAPSHOT" |
|
||||||
// https://mvnrepository.com/artifact/org.apache.spark/spark-core |
|
||||||
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.0" |
|
||||||
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql |
|
||||||
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.0" % "provided" |
|
||||||
// https://mvnrepository.com/artifact/org.postgresql/postgresql |
|
||||||
libraryDependencies += "org.postgresql" % "postgresql" % "42.7.1" |
|
||||||
ThisBuild / scalaVersion := "2.13.12" |
|
||||||
|
|
||||||
lazy val root = (project in file(".")) |
|
||||||
.settings( |
|
||||||
name := "TECHDM_TRANSACTION" |
|
||||||
) |
|
||||||
@ -1 +0,0 @@ |
|||||||
sbt.version = 1.9.8 |
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -1,165 +0,0 @@ |
|||||||
package task |
|
||||||
|
|
||||||
import org.apache.spark.sql.functions._ |
|
||||||
import org.apache.spark.sql.types._ |
|
||||||
import org.apache.spark.sql._ |
|
||||||
|
|
||||||
import java.util.Properties |
|
||||||
|
|
||||||
object task1 extends App { |
|
||||||
|
|
||||||
def formatOfr(value: String, account: String): String = { |
|
||||||
def formatSubstring(str: String): String = |
|
||||||
if (str.endsWith("00")) str.substring(0, 5) |
|
||||||
else s"${str.substring(0, 5)}.${str.substring(5, 7)}" |
|
||||||
|
|
||||||
Option(value).collect { |
|
||||||
case v if v.length == 7 => formatSubstring(v) |
|
||||||
case v if v.length == 5 || v.contains('.') => v |
|
||||||
}.getOrElse(formatSubstring(account.substring(13))) |
|
||||||
} |
|
||||||
|
|
||||||
val formatOfrUDF = udf(formatOfr _) |
|
||||||
|
|
||||||
def processDate(date: String, dtAccount: String, ktAccount: String): String = { |
|
||||||
if (dtAccount.startsWith("707") || ktAccount.startsWith("707")) { |
|
||||||
val originalDate = java.time.LocalDate.parse(date) |
|
||||||
val adjustedDate = originalDate.minusYears(1) |
|
||||||
adjustedDate.toString |
|
||||||
} else date |
|
||||||
} |
|
||||||
|
|
||||||
val processDateUDF = udf(processDate _) |
|
||||||
|
|
||||||
def makeTechdmTransaction(dataProductInput: String, startDate: Option[Column] = None, endDate: Option[Column] = None) = { |
|
||||||
val spark = SparkSession.builder.master("local").appName("Task1").getOrCreate() |
|
||||||
|
|
||||||
val acct = spark.read.parquet("src/main/resources/data/acct/part-00000-6883a1c2-b42b-437c-96d5-15762033328a-c000.snappy.parquet") |
|
||||||
val acctH = spark.read.parquet("src/main/resources/data/acct_h/part-00000-5ecc49e3-ce9c-4f36-a62f-0570f3c29630-c000.snappy.parquet") |
|
||||||
val crncy = spark.read.parquet("src/main/resources/data/crncy/part-00000-f9e00d5e-88f1-4f9e-b5f8-8077e9202152-c000.snappy.parquet") |
|
||||||
val div = spark.read.parquet("src/main/resources/data/div_/part-00000-711b71d0-68d4-40b5-b6c4-76050a546c2e-c000.snappy.parquet") |
|
||||||
val frsymb = spark.read.parquet("src/main/resources/data/frsymb/part-00000-96618fb3-d593-4e78-a1e1-2eb2ca8d1102-c000.snappy.parquet") |
|
||||||
val paydoc = spark.read.parquet("src/main/resources/data/paydoc/part-00000-179f9159-0a68-4b76-a8b3-9a205e9b4569-c000.snappy.parquet") |
|
||||||
val txn = spark.read.parquet("src/main/resources/data/txn/part-00000-02fff546-1c15-4c28-821e-59e0de92feda-c000.snappy.parquet") |
|
||||||
|
|
||||||
val pgConnectionProperties = new Properties() |
|
||||||
pgConnectionProperties.put("user", "postgres") |
|
||||||
pgConnectionProperties.put("password", "12345678") |
|
||||||
val dbString = "jdbc:postgresql://localhost:5432/task1" |
|
||||||
|
|
||||||
val dataProduct = spark.read.jdbc(dbString, "public.data_product", pgConnectionProperties) |
|
||||||
val techdmSettingTransactionFilter = spark.read.jdbc(dbString, "public.techdm_setting_transaction_filter", pgConnectionProperties) |
|
||||||
|
|
||||||
val productRow = dataProduct.filter(dataProduct("dp_code") === dataProductInput).collect()(0) |
|
||||||
val asnuCodeFk = productRow.getAs[String]("asnu_code_fk") |
|
||||||
val transactionDepth = productRow.getAs[Int]("transaction_depth_update") |
|
||||||
|
|
||||||
def checkIfDatesAreSet(start_dt: Option[Column], end_dt: Option[Column]): Boolean = start_dt.isDefined && end_dt.isDefined |
|
||||||
|
|
||||||
val tmpMaxDate = lit(paydoc.select(max(paydoc("input_dttm"))).first().getAs[TimestampType](0)) |
|
||||||
|
|
||||||
val currentDate = current_timestamp() |
|
||||||
|
|
||||||
val finalStartDate: Column = if (checkIfDatesAreSet(startDate, endDate)) startDate.get |
|
||||||
else if (tmpMaxDate == null) date_sub(currentDate, transactionDepth) |
|
||||||
else least(date_sub(tmpMaxDate, transactionDepth), date_sub(currentDate, transactionDepth)) |
|
||||||
|
|
||||||
val finalEndDate = if (checkIfDatesAreSet(startDate, endDate)) endDate.get else date_sub(currentDate, 1) |
|
||||||
|
|
||||||
val transactionFilters = techdmSettingTransactionFilter.filter(techdmSettingTransactionFilter("dp_code_fk") === dataProductInput) |
|
||||||
val dtFilter = transactionFilters.select(col("dt_account_mask_list")) |
|
||||||
.filter(col("dt_account_mask_list").isNotNull) |
|
||||||
.collect |
|
||||||
.map(_.getString(0)) |
|
||||||
.flatMap(_.split(",\\s")) |
|
||||||
.map(mask => col("num").like(mask)) |
|
||||||
.reduce(_ or _) |
|
||||||
val ktFilter = transactionFilters.select(col("kt_account_mask_list")) |
|
||||||
.filter(col("kt_account_mask_list").isNotNull) |
|
||||||
.collect |
|
||||||
.map(_.getString(0)) |
|
||||||
.flatMap(_.split(",\\s")) |
|
||||||
.map(mask => col("num").like(mask)) |
|
||||||
.reduce(_ or _) |
|
||||||
|
|
||||||
//Отбор данных |
|
||||||
val filteredPaydoc = paydoc.filter(finalStartDate <= paydoc("input_dttm") && paydoc("input_dttm") <= finalEndDate && paydoc("paydoc_dp_code") === dataProductInput) |
|
||||||
|
|
||||||
//полупроводки по дт и кт |
|
||||||
val dtPaydoc = filteredPaydoc.join(txn, filteredPaydoc("paydoc_sid") === txn("paydoc_sid") && filteredPaydoc("paydoc_dp_code") === txn("paydoc_dp_code") && txn("cred_flag") === false).drop(txn("input_dttm")) |
|
||||||
val ktPaydoc = filteredPaydoc.join(txn, filteredPaydoc("paydoc_sid") === txn("paydoc_sid") && filteredPaydoc("paydoc_dp_code") === txn("paydoc_dp_code") && txn("cred_flag") === true).drop(txn("input_dttm")) |
|
||||||
|
|
||||||
//данные по счетам |
|
||||||
val dtAccount = dtPaydoc.join(acct, acct("acct_sk") === txn("acct_sk") && acct("acct_dp_code") === txn("acct_dp_code")).drop(acct("acct_sk"), txn("acct_dp_code")).filter(dtFilter) |
|
||||||
val ktAccount = ktPaydoc.join(acct, acct("acct_sk") === txn("acct_sk") && acct("acct_dp_code") === txn("acct_dp_code")).drop(acct("acct_sk"), txn("acct_dp_code")).filter(ktFilter) |
|
||||||
|
|
||||||
//данные по валюте |
|
||||||
val dtCur = dtAccount.join(crncy, crncy("crncy_sk") === dtAccount("crncy_sk") && crncy("crncy_dp_code") === dtAccount("crncy_dp_code")) |
|
||||||
val ktCur = ktAccount.join(crncy, crncy("crncy_sk") === ktAccount("crncy_sk") && crncy("crncy_dp_code") === ktAccount("crncy_dp_code")) |
|
||||||
|
|
||||||
//филиал |
|
||||||
val dtFilial = dtCur.join(acctH, acctH("acct_sk") === dtCur("acct_sk") && acctH("acct_dp_code") === dtCur("acct_dp_code") && acctH("end_dt") >= dtCur("input_dttm"), "left") |
|
||||||
val ktFilial = ktCur.join(acctH, acctH("acct_sk") === ktCur("acct_sk") && acctH("acct_dp_code") === ktCur("acct_dp_code") && acctH("end_dt") >= ktCur("input_dttm"), "left") |
|
||||||
|
|
||||||
val dtFilialWithDiv = dtFilial.join(div, div("div_sk") === dtFilial("bal_div_sk") && div("div_dp_code") === dtFilial("bal_div_dp_code"), "left") |
|
||||||
val ktFilialWithDiv = ktFilial.join(div, div("div_sk") === ktFilial("bal_div_sk") && div("div_dp_code") === ktFilial("bal_div_dp_code"), "left") |
|
||||||
|
|
||||||
//Символы ОФР |
|
||||||
val dtFrSymb = dtFilialWithDiv.join(frsymb, frsymb("frsymb_sk") === dtFilialWithDiv("frsymb_sk") && frsymb("frsymb_dp_code") === dtFilialWithDiv("frsymb_dp_code"), "left") |
|
||||||
val ktFrSymb = ktFilialWithDiv.join(frsymb, frsymb("frsymb_sk") === ktFilialWithDiv("frsymb_sk") && frsymb("frsymb_dp_code") === ktFilialWithDiv("frsymb_dp_code"), "left") |
|
||||||
|
|
||||||
//сборка дт и кт |
|
||||||
val dtFull = dtFrSymb |
|
||||||
.withColumn("asnu_code_fk", lit(asnuCodeFk)) |
|
||||||
.withColumn("dp_code_fk", lit(dataProductInput)) |
|
||||||
.select(filteredPaydoc("paydoc_sid") as "transaction_ek", |
|
||||||
filteredPaydoc("input_dttm") as "transaction_date", |
|
||||||
to_date(filteredPaydoc("input_dttm")) as "input_dt", |
|
||||||
to_date(filteredPaydoc("bal_dttm")) as "value_date", |
|
||||||
filteredPaydoc("doc_num") as "mem_order_number", |
|
||||||
filteredPaydoc("doc_dt") as "mem_order_date", |
|
||||||
acct("sid") as "dt_account_fk", |
|
||||||
acct("acct_dp_code") as "dt_account_dp_code", |
|
||||||
acct("num") as "dt_account", |
|
||||||
crncy("sid") as "dt_currency_fk", |
|
||||||
crncy("crncy_dp_code") as "dt_currency_dp_code", |
|
||||||
crncy("iso_code") as "dt_currency_alfa_code", |
|
||||||
crncy("digital_code") as "dt_currency_digital_code", |
|
||||||
txn("sum_act_amt") as "dt_amount", |
|
||||||
txn("sum_src_bal_amt") as "amount_rub", |
|
||||||
div("sid") as "dt_filial_fk", |
|
||||||
div("div_dp_code") as "dt_filial_dp_code", |
|
||||||
div("code") as "dt_filial_code", |
|
||||||
formatOfrUDF(frsymb("num"), acct("num")) as "dt_ofr", |
|
||||||
filteredPaydoc("doc_purp_txt") as "payment_purpose", |
|
||||||
filteredPaydoc("ctl_validfrom") as "ctl_validfrom_ts" |
|
||||||
) |
|
||||||
val ktFull = ktFrSymb |
|
||||||
.withColumn("asnu_code_fk", lit(asnuCodeFk)) |
|
||||||
.withColumn("dp_code_fk", lit(dataProductInput)) |
|
||||||
.select(filteredPaydoc("paydoc_sid") as "transaction_ek", |
|
||||||
acct("sid") as "kt_account_fk", |
|
||||||
acct("acct_dp_code") as "kt_account_dp_code", |
|
||||||
acct("num") as "kt_account", |
|
||||||
crncy("sid") as "kt_currency_fk", |
|
||||||
crncy("crncy_dp_code") as "kt_currency_dp_code", |
|
||||||
crncy("iso_code") as "kt_currency_alfa_code", |
|
||||||
crncy("digital_code") as "kt_currency_digital_code", |
|
||||||
txn("sum_act_amt") as "kt_amount", |
|
||||||
div("sid") as "kt_filial_fk", |
|
||||||
div("div_dp_code") as "kt_filial_dp_code", |
|
||||||
div("code") as "kt_filial_code", |
|
||||||
formatOfrUDF(frsymb("num"), acct("num")) as "kt_ofr" |
|
||||||
) |
|
||||||
//итоговый dataframe |
|
||||||
val techdmSrcTransaction = dtFull.join(ktFull, dtFull("transaction_ek") === ktFull("transaction_ek")).drop(ktFull("transaction_ek")) |
|
||||||
.withColumn("operation_date", to_date(processDateUDF(col("value_date"), col("dt_account"), col("kt_account")) as "dt_operation_date")) |
|
||||||
techdmSrcTransaction.show() |
|
||||||
//techdmSrcTransaction.write.mode("overwrite").save("src/main/resources/data/result") |
|
||||||
techdmSrcTransaction.write.mode("overwrite").jdbc(dbString, "techdm_transaction", pgConnectionProperties) |
|
||||||
spark.stop() |
|
||||||
} |
|
||||||
|
|
||||||
|
|
||||||
makeTechdmTransaction("ndl_eks_gl", Option(to_date(lit("2015-03-06"))), Option(to_date(lit("2024-03-06")))) |
|
||||||
} |
|
||||||
Loading…
Reference in new issue