Spaces:
Build error
Build error
| import traceback | |
| from pyspark.sql import SparkSession | |
| from pyspark import SparkConf | |
| from pyspark.sql.functions import col,regexp_replace, concat_ws, when, collect_list, lit, to_timestamp | |
| from pyspark.sql.functions import year, month, date_format | |
| from pyspark.sql import functions as F | |
| from pyspark.sql.types import LongType,DecimalType,IntegerType,TimestampType,DoubleType | |
| from pyspark.sql.functions import * | |
| from pytz import timezone | |
| from datetime import datetime,timedelta | |
| from pyspark.sql.window import Window | |
| import json | |
| import sys | |
| import logging | |
| import datetime | |
| import time | |
| import os | |
| import psycopg2 | |
| import requests | |
| from requests.auth import HTTPBasicAuth | |
| import base64 | |
| import functools | |
| import boto3 | |
| # adding '/home/hadoop' path of emr master instance as our downloaded packages will be present at this path | |
| sys.path.append('/home/hadoop') | |
| curr_time = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') | |
| log_file_name = 'job_' + str(datetime.datetime.now().strftime('%Y%m%d_%H%M%S_%f')) + '.log' | |
| extra = {'log_file_name': log_file_name} | |
| logger = logging.getLogger(__name__) | |
| syslog = logging.FileHandler(log_file_name, mode='w') | |
| formatter = logging.Formatter('%(log_file_name)s;%(asctime)s;%(levelname)s;%(message)s') | |
| syslog.setFormatter(formatter) | |
| logger.setLevel(logging.INFO) | |
| logger.addHandler(syslog) | |
| logger = logging.LoggerAdapter(logger, extra) | |
| def read_config(config_path): | |
| logger.info("Inside read config") | |
| try: | |
| # checking if config path provided as input is s3 path or file system path | |
| if config_path[0:2] == 's3': | |
| # read config file from s3 | |
| logger.info("Reading config file from S3") | |
| s3 = boto3.resource('s3') | |
| file_object = s3.Object(config_path.split('/')[2], '/'.join(config_path.split('/')[3:])) | |
| file_content = file_object.get()['Body'].read().decode('utf-8') | |
| # converting file content to json format | |
| json_content = json.loads(file_content) | |
| json_object = json.dumps(json_content) | |
| else: | |
| # reading config file from system | |
| logger.info("Reading config file from path : " + config_path) | |
| # converting file content to json format | |
| json_content = json.load(open(config_path, 'r')) | |
| json_object = json.dumps(json_content) | |
| logger.info("Input Config Details:") | |
| logger.info(json_object) | |
| return json_content | |
| except Exception as e: | |
| raise Exception("Error reading config.") | |
| def get_secret(secret): | |
| secret_name = secret | |
| region_name = "ap-south-1" | |
| session = boto3.session.Session() | |
| client = session.client( | |
| service_name='secretsmanager', | |
| region_name=region_name, | |
| ) | |
| try: | |
| get_secret_value_response = client.get_secret_value(SecretId=secret_name) | |
| except ClientError as e: | |
| if e.response['Error']['Code'] == 'ResourceNotFoundException': | |
| print("The requested secret " + secret_name + " was not found") | |
| elif e.response['Error']['Code'] == 'InvalidRequestException': | |
| print("The request was invalid due to:", e) | |
| elif e.response['Error']['Code'] == 'InvalidParameterException': | |
| print("The request had invalid params:", e) | |
| elif e.response['Error']['Code'] == 'DecryptionFailure': | |
| print("The requested secret can't be decrypted using the provided KMS key:", e) | |
| elif e.response['Error']['Code'] == 'InternalServiceError': | |
| print("An error occurred on service side:", e) | |
| else: | |
| # Secrets Manager decrypts the secret value using the associated KMS CMK | |
| # Depending on whether the secret was a string or binary, only one of these fields will be populated | |
| if 'SecretString' in get_secret_value_response: | |
| text_secret_data = get_secret_value_response['SecretString'] | |
| return text_secret_data | |
| else: | |
| binary_secret_data = get_secret_value_response['SecretBinary'] | |
| return binary_secret_data | |
| logger.info("Secret manager read complete") | |
| def create_spark_session(config): | |
| logger.info("Inside create spark session") | |
| try: | |
| conf = SparkConf() | |
| # setting spark configuration properties provided in config file | |
| spark_conf = dict(config['spark_properties']) | |
| for key in spark_conf.keys(): | |
| conf.set(key, spark_conf[key]) | |
| logger.info("Secret manager read") | |
| if 'application_name' in list(config.keys()): | |
| if config['application_name'] != '': | |
| app_name = config['application_name'] | |
| else: | |
| app_name = 'DefaultApp' | |
| else: | |
| app_name = 'DefaultApp' | |
| logger.info("Secret manager read start") | |
| # creating spark session | |
| spark = SparkSession.builder.config(conf=conf).appName(app_name).enableHiveSupport().getOrCreate() | |
| spark.sparkContext.setLogLevel("ERROR") | |
| spark.conf.set("spark.sql.autoBroadcastJoinThreshold",-1) | |
| spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInRead",'LEGACY') | |
| spark.conf.set("spark.sql.legacy.timeParserPolicy",'CORRECTED') | |
| spark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInWrite",'CORRECTED') | |
| spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite",'CORRECTED') | |
| spark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInRead",'CORRECTED') | |
| spark.conf.set("spark.sql.shuffle.partitions",100) | |
| logger.info("Spark session object created") | |
| return spark | |
| except Exception as e: | |
| raise Exception("Error in Spark Session Creation.") | |
| def read_file(spark,config,table): | |
| readOptions = { | |
| 'hoodie.datasource.query.type': 'incremental', | |
| 'hoodie.datasource.hive_sync.support_timestamp': 'true' | |
| } | |
| path = config['Paths'][table] | |
| df=spark.read.format("hudi").load(path) | |
| df =df.withColumn('_hoodie_commit_time',to_timestamp(F.concat(F.substring(col('_hoodie_commit_time'),1,4),F.lit('-'),\ | |
| F.substring(col('_hoodie_commit_time'),5,2),F.lit('-'),\ | |
| F.substring(col('_hoodie_commit_time'),7,2),F.lit(' '),\ | |
| F.substring(col('_hoodie_commit_time'),9,2),F.lit(':'),\ | |
| F.substring(col('_hoodie_commit_time'),11,2),F.lit(':'),\ | |
| F.substring(col('_hoodie_commit_time'),13,2)\ | |
| ))) | |
| return df | |
| def get_max_audit_batch(conn,job_name, config): | |
| cur = conn.cursor() | |
| cur.execute("SELECT COALESCE(MAX(COALESCE(BATCH_ID,0)),0)+1 FROM "+config['audit_table']) | |
| result = cur.fetchall()[0][0] | |
| logger.info("Maximum batch id in Audit Table is :"+str(result)) | |
| return result | |
| def read_max_update_date(conn, job_name, table, config): | |
| try: | |
| cur = conn.cursor() | |
| cur.execute("SELECT MAX(max_update_date) from "+config['audit_table']+" WHERE mart_table_name = '"+job_name+"' AND src_table_name = '"+table+"'") | |
| query_results = cur.fetchall() | |
| except Exception as e: | |
| print("Database connection failed due to {}".format(e)) | |
| raise Exception("Error reading audit table.") | |
| return query_results | |
| logger.info("Reading max of max_update_date from audit table complete") | |
| def insert_max_update_date(spark,conn, job_name, table, max_update_date,source_reference_date, max_batch_id, config): | |
| try: | |
| cur = conn.cursor() | |
| cur.execute("INSERT INTO "+config['audit_table']+"(mart_table_name, src_table_name, max_update_date, load_timestamp,source_reference_date,batch_id) VALUES ('"+str(job_name)+"', '"+str(table)+"', '"+str(max_update_date)+"', SYSDATE ,'"+str(source_reference_date)+"' as source_reference_date,cast('"+str(max_batch_id)+"' as int) as batch_id)") | |
| except Exception as e: | |
| print("Database connection failed due to {}".format(e)) | |
| raise Exception("Error Updating audit table.") | |
| logger.info("Inserting max max_update_date into audit table complete") | |
| def write_file(spark,conn,redshift_iam_role,resultdf_path, config, table_name): | |
| #Writing resultant data into incr table using copy command | |
| logger.info("write data to redshift started") | |
| try: | |
| cur = conn.cursor() | |
| cur.execute(f"""Truncate table int.{table_name};commit;""" ) | |
| sql="""COPY %s FROM '%s' credentials 'aws_iam_role=%s' FORMAT PARQUET; commit;""" % \ | |
| (f"int.{table_name}", resultdf_path,redshift_iam_role) | |
| cur.execute(sql) | |
| except Exception as e: | |
| print("Database connection failed due to {}".format(e)) | |
| raise Exception("Error Inserting target table.") | |
| print("write complete") | |
| logger.info("upsert data to rds completed") | |
| def main(): | |
| logger.info("Inside main function") | |
| if len(sys.argv) != 2: | |
| logger.info(len(sys.argv)) | |
| logger.info("Command line arguments : " + str(sys.argv)) | |
| logger.info("Incorrect command line arguments.") | |
| exit(1) | |
| config = {} | |
| spark = '' | |
| job_status = '' | |
| try: | |
| # reading json config file | |
| logger.info("Calling function to read config file") | |
| config = read_config(sys.argv[1]) | |
| logger.info("Calling function to create Spark session object") | |
| #creating spark session | |
| spark = create_spark_session(config) | |
| logger.info("Calling function to read input file") | |
| start_time = datetime.datetime.now(timezone("Asia/Kolkata")).strftime('%Y-%m-%d %H:%M:%S') | |
| #creating redshift database connection | |
| redshift_secret = get_secret(config['redshift_secret']) | |
| redshift_secret = json.loads(redshift_secret) | |
| redshift_user = redshift_secret['username'] | |
| redshift_pwd = redshift_secret['password'] | |
| redshift_host = redshift_secret['host'] | |
| redshift_port = str(redshift_secret['port']) | |
| redshift_dbname = redshift_secret['dbname'] | |
| #creating database connection | |
| redshift_conn=psycopg2.connect(dbname=redshift_dbname, host=redshift_host, port=redshift_port, user=redshift_user, password=redshift_pwd) | |
| redshift_dburl = "jdbc:postgresql://"+redshift_host+":"+redshift_port+"/"+redshift_dbname | |
| cur = redshift_conn.cursor() | |
| max_batch_id = get_max_audit_batch(redshift_conn, config['application_name'], config) | |
| INSERT_CODE_1 | |
| #writing from parquet to table in database | |
| write_file(spark, redshift_conn, config['redshift_iam_role'],config['incr2df_path'],config, config['incr2df']) | |
| write_file(spark, redshift_conn, config['redshift_iam_role'],config['resultdf_path'],config, config['resultdf']) | |
| INSERT_CODE_2 | |
| print('Run Successful') | |
| print('End of Code') | |
| except Exception as e: | |
| #job gets error | |
| job_status = 'Failed' | |
| print(e) | |
| finally: | |
| spark.catalog.clearCache() | |
| redshift_conn.commit() | |
| redshift_conn.close() | |
| spark.stop() | |
| if __name__ == "__main__": | |
| # calling main function | |
| logger.info("Calling main function") | |
| main() |