kltn20133118 commited on
Commit
7fb2e71
verified
1 Parent(s): 413061c

Update function/dropbox.py

Browse files
Files changed (1) hide show
  1. function/dropbox.py +154 -154
function/dropbox.py CHANGED
@@ -1,155 +1,155 @@
1
- import dropbox.files
2
- import os
3
- import shutil
4
- import requests, base64
5
- from fastapi import HTTPException
6
- from dotenv import load_dotenv
7
- import os
8
- load_dotenv()
9
- DROPBOX_APP_KEY=os.getenv('DROPBOX_APP_KEY')
10
- DROPBOX_APP_SECRET=os.getenv('DROPBOX_APP_SECRET')
11
- DROPBOX_REFRESH_TOKEN=os.getenv('DROPBOX_REFRESH_TOKEN')
12
-
13
- def refresh_token_dropbox():
14
- app_key = DROPBOX_APP_KEY
15
- app_secret = DROPBOX_APP_SECRET
16
- refresh_token = DROPBOX_REFRESH_TOKEN
17
- url = 'https://api.dropbox.com/oauth2/token'
18
- auth_string = f"{app_key}:{app_secret}"
19
- base64authorization = base64.b64encode(auth_string.encode()).decode('utf-8')
20
- headers = {
21
- 'Authorization': f'Basic {base64authorization}',
22
- 'Content-Type': 'application/x-www-form-urlencoded'
23
- }
24
- data = {
25
- 'refresh_token': refresh_token,
26
- 'grant_type': 'refresh_token'
27
- }
28
- response = requests.post(url, headers=headers, data=data)
29
- response_json = response.json()
30
- access_token = response_json.get('access_token', None)
31
- return access_token
32
-
33
- def delete_file(id,name_file):
34
- try:
35
- TOKEN = refresh_token_dropbox()
36
- dbx=dropbox.Dropbox(TOKEN)
37
- file_path = f"/{id}/{name_file}"
38
- dbx.files_delete_v2(file_path)
39
- print(f"X贸a file '{file_path}' th脿nh c么ng.")
40
- except dropbox.exceptions.ApiError as e:
41
- print(f"L峄梚 khi x贸a file '{file_path}': {e}")
42
-
43
- def list_files(id):
44
- file_names = []
45
- try:
46
- TOKEN = refresh_token_dropbox()
47
- dbx=dropbox.Dropbox(TOKEN)
48
- result = dbx.files_list_folder(f"/{id}")
49
- for entry in result.entries:
50
- if isinstance(entry, dropbox.files.FileMetadata):
51
- file_names.append(os.path.basename(entry.path_display))
52
- except dropbox.exceptions.ApiError as e:
53
- print(f"Error listing files: {e}")
54
- return file_names
55
-
56
- def upload_file_fix(local_path,cloud_path,token):
57
- try:
58
- TOKEN = refresh_token_dropbox()
59
- dbx=dropbox.Dropbox(TOKEN)
60
- with open(local_path, "rb") as f:
61
- data = f.read()
62
- dbx.files_upload(data, cloud_path)
63
- print(f"Uploaded file '{local_path}' to '{cloud_path}'")
64
- except dropbox.exceptions.ApiError as e:
65
- print(f"Error uploading file '{local_path}': {e}")
66
-
67
- def upload_file(local_path, cloud_path):
68
- try:
69
- TOKEN = refresh_token_dropbox()
70
- dbx=dropbox.Dropbox(TOKEN)
71
- with open(local_path, "rb") as f:
72
- data = f.read()
73
- dbx.files_upload(data, cloud_path)
74
- print(f"Uploaded file '{local_path}' to '{cloud_path}'")
75
- except dropbox.exceptions.ApiError as e:
76
- upload_file_fix()
77
-
78
- def clear_local_folder(path):
79
- try:
80
- for filename in os.listdir(path):
81
- file_path = os.path.join(path, filename)
82
- if os.path.isfile(file_path) or os.path.islink(file_path):
83
- os.unlink(file_path)
84
- elif os.path.isdir(file_path):
85
- shutil.rmtree(file_path)
86
- except Exception as e:
87
- print(f"Failed to delete contents of {path}. Reason: {e}")
88
-
89
- def download_folder(id):
90
- try:
91
- TOKEN = refresh_token_dropbox()
92
- dbx = dropbox.Dropbox(TOKEN)
93
- local_path = f"./user_file/{id}"
94
- os.makedirs(local_path, exist_ok=True)
95
- clear_local_folder(local_path)
96
- result = dbx.files_list_folder(f"/{id}")
97
- for entry in result.entries:
98
- if isinstance(entry, dropbox.files.FileMetadata):
99
- cloud_file_path = entry.path_display
100
- file_name = os.path.basename(cloud_file_path)
101
- local_file_path = os.path.join(local_path, file_name)
102
- dbx.files_download_to_file(local_file_path, cloud_file_path)
103
- print(f"Downloaded file '{file_name}' to '{local_file_path}'")
104
- except dropbox.exceptions.ApiError as e:
105
- print(f"Error downloading file '{id}': {e}")
106
-
107
- def download_file_id(file_name, id):
108
- try:
109
- TOKEN = refresh_token_dropbox()
110
- dbx = dropbox.Dropbox(TOKEN)
111
- local_folder_path = f"./user_file/{id}"
112
- os.makedirs(local_folder_path, exist_ok=True)
113
- local_file_path = os.path.join(local_folder_path, file_name)
114
- with open(local_file_path, "wb") as f:
115
- metadata, response = dbx.files_download(f"/{id}/{file_name}")
116
- f.write(response.content)
117
- print(f"Downloaded file '{file_name}' to '{local_file_path}'")
118
- except dropbox.exceptions.ApiError as e:
119
- print(f"Error downloading file '{file_name}': {e}")
120
- raise HTTPException(status_code=500, detail="Internal Server Error")
121
-
122
- def search_and_download_file(start_char, id):
123
- try:
124
- TOKEN = refresh_token_dropbox()
125
- dbx = dropbox.Dropbox(TOKEN)
126
- result = dbx.files_list_folder(f"/{id}")
127
- files_starting_with_char = [entry.name for entry in result.entries if entry.name.startswith(start_char)]
128
- if len(files_starting_with_char) == 0:
129
- print(f"No file found starting with '{start_char}' in folder '{id}'")
130
- return
131
- file_name = files_starting_with_char[0]
132
- local_folder_path = f"./user_file/{id}"
133
- os.makedirs(local_folder_path, exist_ok=True)
134
- local_file_path = os.path.join(local_folder_path, file_name)
135
- with open(local_file_path, "wb") as f:
136
- metadata, response = dbx.files_download(f"/{id}/{file_name}")
137
- f.write(response.content)
138
- print(f"Downloaded file '{file_name}' to '{local_file_path}'")
139
- except dropbox.exceptions.ApiError as e:
140
- print(f"Error searching or downloading file: {e}")
141
- raise HTTPException(status_code=500, detail="Internal Server Error")
142
-
143
- def delete_all_files_in_folder(folder_id):
144
- try:
145
- TOKEN = refresh_token_dropbox()
146
- dbx = dropbox.Dropbox(TOKEN)
147
- result = dbx.files_list_folder(f"/{folder_id}")
148
- for entry in result.entries:
149
- if isinstance(entry, dropbox.files.FileMetadata):
150
- file_path = entry.path_display
151
- dbx.files_delete_v2(file_path)
152
- print(f"Deleted file '{file_path}'")
153
- print(f"All files in folder '{folder_id}' have been deleted.")
154
- except dropbox.exceptions.ApiError as e:
155
  print(f"Error deleting files: {e}")
 
1
+ import dropbox.files
2
+ import os
3
+ import shutil
4
+ import requests, base64
5
+ from fastapi import HTTPException
6
+ from dotenv import load_dotenv
7
+ import os
8
+ load_dotenv()
9
+ DROPBOX_APP_KEY=os.getenv('DROPBOX_APP_KEY')
10
+ DROPBOX_APP_SECRET=os.getenv('DROPBOX_APP_SECRET')
11
+ DROPBOX_REFRESH_TOKEN=os.getenv('DROPBOX_REFRESH_TOKEN')
12
+
13
+ def refresh_token_dropbox():
14
+ app_key = DROPBOX_APP_KEY
15
+ app_secret = DROPBOX_APP_SECRET
16
+ refresh_token = DROPBOX_REFRESH_TOKEN
17
+ url = 'https://api.dropbox.com/oauth2/token'
18
+ auth_string = f"{app_key}:{app_secret}"
19
+ base64authorization = base64.b64encode(auth_string.encode()).decode('utf-8')
20
+ headers = {
21
+ 'Authorization': f'Basic {base64authorization}',
22
+ 'Content-Type': 'application/x-www-form-urlencoded'
23
+ }
24
+ data = {
25
+ 'refresh_token': refresh_token,
26
+ 'grant_type': 'refresh_token'
27
+ }
28
+ response = requests.post(url, headers=headers, data=data)
29
+ response_json = response.json()
30
+ access_token = response_json.get('access_token', None)
31
+ return access_token
32
+
33
+ def delete_file(id,name_file):
34
+ try:
35
+ TOKEN = refresh_token_dropbox()
36
+ dbx=dropbox.Dropbox(TOKEN)
37
+ file_path = f"/{id}/{name_file}"
38
+ dbx.files_delete_v2(file_path)
39
+ print(f"X贸a file '{file_path}' th脿nh c么ng.")
40
+ except dropbox.exceptions.ApiError as e:
41
+ print(f"L峄梚 khi x贸a file '{file_path}': {e}")
42
+
43
+ def list_files(id):
44
+ file_names = []
45
+ try:
46
+ TOKEN = refresh_token_dropbox()
47
+ dbx=dropbox.Dropbox(TOKEN)
48
+ result = dbx.files_list_folder(f"/{id}")
49
+ for entry in result.entries:
50
+ if isinstance(entry, dropbox.files.FileMetadata):
51
+ file_names.append(os.path.basename(entry.path_display))
52
+ except dropbox.exceptions.ApiError as e:
53
+ print(f"Error listing files: {e}")
54
+ return file_names
55
+
56
+ def upload_file_fix(local_path,cloud_path,token):
57
+ try:
58
+ TOKEN = refresh_token_dropbox()
59
+ dbx=dropbox.Dropbox(TOKEN)
60
+ with open(local_path, "rb") as f:
61
+ data = f.read()
62
+ dbx.files_upload(data, cloud_path)
63
+ print(f"Uploaded file '{local_path}' to '{cloud_path}'")
64
+ except dropbox.exceptions.ApiError as e:
65
+ print(f"Error uploading file '{local_path}': {e}")
66
+
67
+ def upload_file(local_path, cloud_path):
68
+ try:
69
+ TOKEN = refresh_token_dropbox()
70
+ dbx=dropbox.Dropbox(TOKEN)
71
+ with open(local_path, "rb") as f:
72
+ data = f.read()
73
+ dbx.files_upload(data, cloud_path)
74
+ print(f"Uploaded file '{local_path}' to '{cloud_path}'")
75
+ except dropbox.exceptions.ApiError as e:
76
+ upload_file_fix()
77
+
78
+ def clear_local_folder(path):
79
+ try:
80
+ for filename in os.listdir(path):
81
+ file_path = os.path.join(path, filename)
82
+ if os.path.isfile(file_path) or os.path.islink(file_path):
83
+ os.unlink(file_path)
84
+ elif os.path.isdir(file_path):
85
+ shutil.rmtree(file_path)
86
+ except Exception as e:
87
+ print(f"Failed to delete contents of {path}. Reason: {e}")
88
+
89
+ def download_folder(id):
90
+ try:
91
+ TOKEN = refresh_token_dropbox()
92
+ dbx = dropbox.Dropbox(TOKEN)
93
+ local_path = f"./code/user_file/{id}"
94
+ os.makedirs(local_path, exist_ok=True)
95
+ clear_local_folder(local_path)
96
+ result = dbx.files_list_folder(f"/{id}")
97
+ for entry in result.entries:
98
+ if isinstance(entry, dropbox.files.FileMetadata):
99
+ cloud_file_path = entry.path_display
100
+ file_name = os.path.basename(cloud_file_path)
101
+ local_file_path = os.path.join(local_path, file_name)
102
+ dbx.files_download_to_file(local_file_path, cloud_file_path)
103
+ print(f"Downloaded file '{file_name}' to '{local_file_path}'")
104
+ except dropbox.exceptions.ApiError as e:
105
+ print(f"Error downloading file '{id}': {e}")
106
+
107
+ def download_file_id(file_name, id):
108
+ try:
109
+ TOKEN = refresh_token_dropbox()
110
+ dbx = dropbox.Dropbox(TOKEN)
111
+ local_folder_path = f"/code/user_file/{id}"
112
+ os.makedirs(local_folder_path, exist_ok=True)
113
+ local_file_path = os.path.join(local_folder_path, file_name)
114
+ with open(local_file_path, "wb") as f:
115
+ metadata, response = dbx.files_download(f"/{id}/{file_name}")
116
+ f.write(response.content)
117
+ print(f"Downloaded file '{file_name}' to '{local_file_path}'")
118
+ except dropbox.exceptions.ApiError as e:
119
+ print(f"Error downloading file '{file_name}': {e}")
120
+ raise HTTPException(status_code=500, detail="Internal Server Error")
121
+
122
+ def search_and_download_file(start_char, id):
123
+ try:
124
+ TOKEN = refresh_token_dropbox()
125
+ dbx = dropbox.Dropbox(TOKEN)
126
+ result = dbx.files_list_folder(f"/{id}")
127
+ files_starting_with_char = [entry.name for entry in result.entries if entry.name.startswith(start_char)]
128
+ if len(files_starting_with_char) == 0:
129
+ print(f"No file found starting with '{start_char}' in folder '{id}'")
130
+ return
131
+ file_name = files_starting_with_char[0]
132
+ local_folder_path = f"./code/user_file/{id}"
133
+ os.makedirs(local_folder_path, exist_ok=True)
134
+ local_file_path = os.path.join(local_folder_path, file_name)
135
+ with open(local_file_path, "wb") as f:
136
+ metadata, response = dbx.files_download(f"/{id}/{file_name}")
137
+ f.write(response.content)
138
+ print(f"Downloaded file '{file_name}' to '{local_file_path}'")
139
+ except dropbox.exceptions.ApiError as e:
140
+ print(f"Error searching or downloading file: {e}")
141
+ raise HTTPException(status_code=500, detail="Internal Server Error")
142
+
143
+ def delete_all_files_in_folder(folder_id):
144
+ try:
145
+ TOKEN = refresh_token_dropbox()
146
+ dbx = dropbox.Dropbox(TOKEN)
147
+ result = dbx.files_list_folder(f"/{folder_id}")
148
+ for entry in result.entries:
149
+ if isinstance(entry, dropbox.files.FileMetadata):
150
+ file_path = entry.path_display
151
+ dbx.files_delete_v2(file_path)
152
+ print(f"Deleted file '{file_path}'")
153
+ print(f"All files in folder '{folder_id}' have been deleted.")
154
+ except dropbox.exceptions.ApiError as e:
155
  print(f"Error deleting files: {e}")