hysts HF Staff Wauplin HF Staff commited on
Commit
e51b1cb
·
1 Parent(s): 4578dbd

Use parquet instead of zip

Browse files

Co-authored-by: Lucain Pouget <[email protected]>

Files changed (3) hide show
  1. app.py +30 -56
  2. requirements.txt +1 -0
  3. scheduler.py +66 -14
app.py CHANGED
@@ -12,11 +12,11 @@ from typing import Any
12
  import gradio as gr
13
  from gradio_client import Client
14
 
15
- from scheduler import ZipScheduler
16
 
17
  HF_TOKEN = os.getenv('HF_TOKEN')
18
  UPLOAD_REPO_ID = os.getenv('UPLOAD_REPO_ID')
19
- UPLOAD_FREQUENCY = int(os.getenv('UPLOAD_FREQUENCY', '5'))
20
  USE_PUBLIC_REPO = os.getenv('USE_PUBLIC_REPO') == '1'
21
  LOCAL_SAVE_DIR = pathlib.Path(os.getenv('LOCAL_SAVE_DIR', 'results'))
22
  LOCAL_SAVE_DIR.mkdir(parents=True, exist_ok=True)
@@ -25,45 +25,18 @@ ABOUT_THIS_SPACE = '''
25
  This Space is a sample Space that collects user preferences for the results generated by a diffusion model.
26
  This demo calls the [stable diffusion Space](https://huggingface.co/spaces/stabilityai/stable-diffusion) with the [`gradio_client`](https://pypi.org/project/gradio-client/) library.
27
 
28
- The user preference data is periodically zipped and uploaded to [this dataset repo](https://huggingface.co/datasets/hysts-samples/sample-user-preferences).
29
- The directory structure of the zipped data is as follows:
30
- ```
31
- results
32
- ├── 11e11b01-3388-48b3-a2ab-1b58d156e466
33
- │ ├── 000.jpg
34
- │ ├── 001.jpg
35
- │ ├── 002.jpg
36
- │ ├── 003.jpg
37
- │ └── preferences.json
38
- ├── 1470ec1d-67a1-47ae-ab9c-4e0e0594dadf
39
- │ ├── 000.jpg
40
- │ ├── 001.jpg
41
- │ ├── 002.jpg
42
- │ ├── 003.jpg
43
- │ └── preferences.json
44
- ...
45
- ```
46
- Also, each `preferences.json` looks like this:
47
- ```
48
- {
49
- "prompt": "an astronaut riding a horse",
50
- "negative_prompt": "",
51
- "guidance_scale": 9,
52
- "selected_index": 2,
53
- "timestamp": "2023-06-15T07:57:00.097883"
54
- }
55
- ```
56
 
57
  The periodic upload is done using [`huggingface_hub.CommitScheduler`](https://huggingface.co/docs/huggingface_hub/main/en/package_reference/hf_api#huggingface_hub.CommitScheduler).
58
  See [this Space](https://huggingface.co/spaces/Wauplin/space_to_dataset_saver) for more general usage.
59
  '''
60
 
61
- scheduler = ZipScheduler(repo_id=UPLOAD_REPO_ID,
62
- repo_type='dataset',
63
- every=UPLOAD_FREQUENCY,
64
- private=not USE_PUBLIC_REPO,
65
- token=HF_TOKEN,
66
- folder_path=LOCAL_SAVE_DIR)
67
 
68
  client = Client('stabilityai/stable-diffusion')
69
 
@@ -81,12 +54,11 @@ def generate(prompt: str) -> tuple[str, list[str]]:
81
  'negative_prompt': negative_prompt,
82
  'guidance_scale': guidance_scale,
83
  }
84
- config_file = tempfile.NamedTemporaryFile(mode='w',
85
- suffix='.json',
86
- delete=False)
87
- json.dump(config, config_file)
88
 
89
- with open(pathlib.Path(out_dir) / 'captions.json') as f:
90
  paths = list(json.load(f).keys())
91
  return config_file.name, paths
92
 
@@ -100,21 +72,23 @@ def save_preference(config_path: str, gallery: list[dict[str, Any]],
100
  save_dir = LOCAL_SAVE_DIR / f'{uuid.uuid4()}'
101
  save_dir.mkdir(parents=True, exist_ok=True)
102
 
103
- paths = [x['name'] for x in gallery]
104
- with scheduler.lock:
105
- for index, path in enumerate(paths):
106
- ext = pathlib.Path(path).suffix
107
- shutil.move(path, save_dir / f'{index:03d}{ext}')
108
-
109
- with open(config_path) as f:
110
- config = json.load(f)
111
- json_path = save_dir / 'preferences.json'
112
- with json_path.open('w') as f:
113
- preferences = config | {
114
- 'selected_index': selected_index,
115
- 'timestamp': datetime.datetime.utcnow().isoformat(),
116
- }
117
- json.dump(preferences, f)
 
 
118
 
119
 
120
  def clear() -> tuple[dict, dict, dict]:
 
12
  import gradio as gr
13
  from gradio_client import Client
14
 
15
+ from scheduler import ParquetScheduler
16
 
17
  HF_TOKEN = os.getenv('HF_TOKEN')
18
  UPLOAD_REPO_ID = os.getenv('UPLOAD_REPO_ID')
19
+ UPLOAD_FREQUENCY = int(os.getenv('UPLOAD_FREQUENCY', '15'))
20
  USE_PUBLIC_REPO = os.getenv('USE_PUBLIC_REPO') == '1'
21
  LOCAL_SAVE_DIR = pathlib.Path(os.getenv('LOCAL_SAVE_DIR', 'results'))
22
  LOCAL_SAVE_DIR.mkdir(parents=True, exist_ok=True)
 
25
  This Space is a sample Space that collects user preferences for the results generated by a diffusion model.
26
  This demo calls the [stable diffusion Space](https://huggingface.co/spaces/stabilityai/stable-diffusion) with the [`gradio_client`](https://pypi.org/project/gradio-client/) library.
27
 
28
+ The user preference data is periodically archived in parquet format and uploaded to [this dataset repo](https://huggingface.co/datasets/hysts-samples/sample-user-preferences).
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29
 
30
  The periodic upload is done using [`huggingface_hub.CommitScheduler`](https://huggingface.co/docs/huggingface_hub/main/en/package_reference/hf_api#huggingface_hub.CommitScheduler).
31
  See [this Space](https://huggingface.co/spaces/Wauplin/space_to_dataset_saver) for more general usage.
32
  '''
33
 
34
+ scheduler = ParquetScheduler(repo_id=UPLOAD_REPO_ID,
35
+ repo_type='dataset',
36
+ every=UPLOAD_FREQUENCY,
37
+ private=not USE_PUBLIC_REPO,
38
+ token=HF_TOKEN,
39
+ folder_path=LOCAL_SAVE_DIR)
40
 
41
  client = Client('stabilityai/stable-diffusion')
42
 
 
54
  'negative_prompt': negative_prompt,
55
  'guidance_scale': guidance_scale,
56
  }
57
+ with tempfile.NamedTemporaryFile(mode='w', suffix='.json',
58
+ delete=False) as config_file:
59
+ json.dump(config, config_file)
 
60
 
61
+ with (pathlib.Path(out_dir) / 'captions.json').open() as f:
62
  paths = list(json.load(f).keys())
63
  return config_file.name, paths
64
 
 
72
  save_dir = LOCAL_SAVE_DIR / f'{uuid.uuid4()}'
73
  save_dir.mkdir(parents=True, exist_ok=True)
74
 
75
+ # Load config
76
+ with open(config_path) as f:
77
+ data = json.load(f)
78
+
79
+ # Add selected item + timestamp
80
+ data['selected_index'] = selected_index
81
+ data['timestamp'] = datetime.datetime.utcnow().isoformat()
82
+
83
+ # Copy and add images
84
+ for index, path in enumerate(x['name'] for x in gallery):
85
+ name = f'{index:03d}'
86
+ dst_path = save_dir / f'{name}{pathlib.Path(path).suffix}'
87
+ shutil.move(path, dst_path)
88
+ data[f'image_{name}'] = dst_path
89
+
90
+ # Send to scheduler
91
+ scheduler.append(data)
92
 
93
 
94
  def clear() -> tuple[dict, dict, dict]:
requirements.txt CHANGED
@@ -1,2 +1,3 @@
1
  git+https://github.com/huggingface/huggingface_hub@928e138
2
  gradio_client==0.2.7
 
 
1
  git+https://github.com/huggingface/huggingface_hub@928e138
2
  gradio_client==0.2.7
3
+ pyarrow==12.0.1
scheduler.py CHANGED
@@ -1,31 +1,83 @@
1
- import shutil
2
  import tempfile
3
  import uuid
 
 
4
 
 
 
5
  from huggingface_hub import CommitScheduler
6
 
7
 
8
- class ZipScheduler(CommitScheduler):
 
 
 
 
 
 
9
  def push_to_hub(self):
 
10
  with self.lock:
11
- if not any(self.folder_path.iterdir()):
12
- return
13
- archive_file = tempfile.NamedTemporaryFile(suffix='.zip')
14
- archive_name = archive_file.name.split('.')[
15
- 0] # `make_archive` automatically append `.zip`
16
- shutil.make_archive(base_name=archive_name,
17
- format='zip',
18
- root_dir=self.folder_path.parent,
19
- base_dir=self.folder_path.name)
20
- shutil.rmtree(self.folder_path, ignore_errors=True)
21
- self.folder_path.mkdir(parents=True, exist_ok=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22
 
 
23
  self.api.upload_file(
24
  repo_id=self.repo_id,
25
  repo_type=self.repo_type,
26
  revision=self.revision,
27
- path_in_repo=f'{uuid.uuid4()}.zip',
28
  path_or_fileobj=archive_file.name,
29
  )
30
 
 
31
  archive_file.close()
 
 
 
1
+ import json
2
  import tempfile
3
  import uuid
4
+ from pathlib import Path
5
+ from typing import Any, Dict, List
6
 
7
+ import pyarrow as pa
8
+ import pyarrow.parquet as pq
9
  from huggingface_hub import CommitScheduler
10
 
11
 
12
+ class ParquetScheduler(CommitScheduler):
13
+ def append(self, row: Dict[str, Any]) -> None:
14
+ with self.lock:
15
+ if not hasattr(self, 'rows') or self.rows is None: # type: ignore
16
+ self.rows = []
17
+ self.rows.append(row)
18
+
19
  def push_to_hub(self):
20
+ # Check for new rows to push
21
  with self.lock:
22
+ rows = self.rows
23
+ self.rows = None
24
+ if not rows:
25
+ return
26
+
27
+ # Load images + create 'features' config for datasets library
28
+ hf_features: dict[str, Dict] = {}
29
+ path_to_cleanup: List[Path] = []
30
+ for row in rows:
31
+ for key, value in row.items():
32
+ if 'image' in key:
33
+ # It's an image: we load the bytes, define a special schema and remember to cleanup the file
34
+ # Note: could do the same with "Audio"
35
+ image_path = Path(value)
36
+ if image_path.is_file():
37
+ row[key] = {
38
+ 'path': image_path.name,
39
+ 'bytes': image_path.read_bytes()
40
+ }
41
+ path_to_cleanup.append(image_path)
42
+ if key not in hf_features:
43
+ hf_features[key] = {'_type': 'Image'}
44
+ else:
45
+ # Otherwise, do nothing special
46
+ if key not in hf_features:
47
+ hf_features[key] = {
48
+ '_type': 'Value',
49
+ 'dtype': 'string'
50
+ }
51
+
52
+ # Complete rows if needed
53
+ for row in rows:
54
+ for feature in hf_features:
55
+ if feature not in row:
56
+ row[feature] = None
57
+
58
+ # Export items to Arrow format
59
+ table = pa.Table.from_pylist(rows)
60
+
61
+ # Add metadata (used by datasets library)
62
+ table = table.replace_schema_metadata(
63
+ {'huggingface': json.dumps({'info': {
64
+ 'features': hf_features
65
+ }})})
66
+
67
+ # Write to parquet file
68
+ archive_file = tempfile.NamedTemporaryFile()
69
+ pq.write_table(table, archive_file.name)
70
 
71
+ # Upload
72
  self.api.upload_file(
73
  repo_id=self.repo_id,
74
  repo_type=self.repo_type,
75
  revision=self.revision,
76
+ path_in_repo=f'{uuid.uuid4()}.parquet',
77
  path_or_fileobj=archive_file.name,
78
  )
79
 
80
+ # Cleanup
81
  archive_file.close()
82
+ for path in path_to_cleanup:
83
+ path.unlink(missing_ok=True)