Update spark_setup.py
Browse files- spark_setup.py +40 -40
spark_setup.py
CHANGED
@@ -1,40 +1,40 @@
|
|
1 |
-
# spark_setup.py
|
2 |
-
from pyspark.sql import SparkSession
|
3 |
-
|
4 |
-
# Initialize Spark session
|
5 |
-
def create_spark_session(
|
6 |
-
|
7 |
-
|
8 |
-
|
9 |
-
|
10 |
-
|
11 |
-
|
12 |
-
# Load data into Spark DataFrames and return a dictionary of DataFrames
|
13 |
-
def load_data(spark, file_paths):
|
14 |
-
dataframes = {}
|
15 |
-
for name, path in file_paths.items():
|
16 |
-
dataframes[name] = spark.read.csv(path, header=True, inferSchema=True)
|
17 |
-
# Register as a temporary view
|
18 |
-
dataframes[name].createOrReplaceTempView(name)
|
19 |
-
return dataframes
|
20 |
-
|
21 |
-
# File paths for each dataset
|
22 |
-
file_paths = {
|
23 |
-
"author_affiliations": "data/author_affiliations.csv",
|
24 |
-
"affiliations": "data/affiliations.csv",
|
25 |
-
"subject_areas": "data/subject_areas.csv",
|
26 |
-
"keywords": "data/keywords.csv",
|
27 |
-
"publications": "data/publications.csv",
|
28 |
-
"authors": "data/authors.csv",
|
29 |
-
"embeddings": "data/embeddings.csv",
|
30 |
-
"clustering": "data/clustering.csv",
|
31 |
-
"abstracts": "data/abstracts.csv",
|
32 |
-
"geospatial_clustering_data": "data/geospatial_clustering_data.csv",
|
33 |
-
"geospatial_data_by_publication": "data/geospatial_data_by_publication.csv",
|
34 |
-
"scopus_affiliation_data": "data/scopus_affiliation_data.csv"
|
35 |
-
}
|
36 |
-
|
37 |
-
if __name__ == "__main__":
|
38 |
-
spark = create_spark_session()
|
39 |
-
dataframes = load_data(spark, file_paths)
|
40 |
-
print("Spark session initialized and data loaded.")
|
|
|
1 |
+
# spark_setup.py
|
2 |
+
from pyspark.sql import SparkSession
|
3 |
+
|
4 |
+
# Initialize Spark session
|
5 |
+
def create_spark_session():
|
6 |
+
return (SparkSession.builder
|
7 |
+
.appName("HFSpaceSparkSession")
|
8 |
+
.config("spark.executor.memory", "1g")
|
9 |
+
.config("spark.driver.memory", "1g")
|
10 |
+
.getOrCreate())
|
11 |
+
|
12 |
+
# Load data into Spark DataFrames and return a dictionary of DataFrames
|
13 |
+
def load_data(spark, file_paths):
|
14 |
+
dataframes = {}
|
15 |
+
for name, path in file_paths.items():
|
16 |
+
dataframes[name] = spark.read.csv(path, header=True, inferSchema=True)
|
17 |
+
# Register as a temporary view
|
18 |
+
dataframes[name].createOrReplaceTempView(name)
|
19 |
+
return dataframes
|
20 |
+
|
21 |
+
# File paths for each dataset
|
22 |
+
file_paths = {
|
23 |
+
"author_affiliations": "data/author_affiliations.csv",
|
24 |
+
"affiliations": "data/affiliations.csv",
|
25 |
+
"subject_areas": "data/subject_areas.csv",
|
26 |
+
"keywords": "data/keywords.csv",
|
27 |
+
"publications": "data/publications.csv",
|
28 |
+
"authors": "data/authors.csv",
|
29 |
+
"embeddings": "data/embeddings.csv",
|
30 |
+
"clustering": "data/clustering.csv",
|
31 |
+
"abstracts": "data/abstracts.csv",
|
32 |
+
"geospatial_clustering_data": "data/geospatial_clustering_data.csv",
|
33 |
+
"geospatial_data_by_publication": "data/geospatial_data_by_publication.csv",
|
34 |
+
"scopus_affiliation_data": "data/scopus_affiliation_data.csv"
|
35 |
+
}
|
36 |
+
|
37 |
+
if __name__ == "__main__":
|
38 |
+
spark = create_spark_session()
|
39 |
+
dataframes = load_data(spark, file_paths)
|
40 |
+
print("Spark session initialized and data loaded.")
|