comparison cache.py @ 0:f40d05521dca draft default tip

planemo upload for repository https://github.com/esg-epfl-apc/tools-astro/tree/main/tools commit de01e3c02a26cd6353a6b9b6f8d1be44de8ccd54
author astroteam
date Fri, 25 Apr 2025 19:33:20 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:f40d05521dca
1 import os
2 import json
3
4 from minio import Minio
5
6 # from minio.error import S3Error
7
8
9 class CRbeamCache(object):
10 credentials_env_var = "S3_CREDENTIALS"
11 bucket_name = "crbeam"
12 chunk_size = 32 * 1024
13
14 # default credentials
15 endpoint = "play.min.io"
16 access_key = "Q3AM3UQ867SPQQA43P2F"
17 secret_key = "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG"
18
19 def __init__(self) -> None:
20 credentials = os.getenv(self.credentials_env_var)
21 if credentials is not None:
22 credentials = json.loads(credentials)
23 self.endpoint = credentials["endpoint"]
24 self.access_key = credentials["access_key"]
25 self.secret_key = credentials["secret_key"]
26
27 self.client = Minio(
28 self.endpoint,
29 access_key=self.access_key,
30 secret_key=self.secret_key,
31 )
32
33 # Make bucket if not exist.
34 if not self.client.bucket_exists(self.bucket_name):
35 self.client.make_bucket(self.bucket_name)
36
37 def save(self, obj_name, file_path, append_mode=True, **params):
38 if append_mode:
39 idx = 0
40 for _ in self.client.list_objects(
41 self.bucket_name, prefix=obj_name
42 ):
43 idx += 1
44 obj_path = obj_name + f"-{idx}"
45 else:
46 obj_path = obj_name
47 self.client.fput_object(
48 self.bucket_name, obj_path, file_path, metadata=params
49 )
50 return obj_path
51
52 def get_cache_size(self, prefix):
53 size = 0
54 for obj in self.client.list_objects(self.bucket_name, prefix=prefix):
55 size += int(
56 obj.object_name[len(prefix):].split("-")[0]
57 ) # todo: load number of particles from metadata
58 return size
59
60 def load_file(self, output_path, obj_name):
61 try:
62 response = self.client.get_object(self.bucket_name, obj_name)
63 except Exception:
64 raise ValueError("object not found")
65 try:
66 # Read data from response.
67 with open(output_path, "wb") as file:
68 for d in response.stream(self.chunk_size):
69 file.write(d)
70 finally:
71 response.close()
72 response.release_conn()
73
74 def load_results(self, output_path, prefix, skip_paths=[]):
75 for obj in self.client.list_objects(self.bucket_name, prefix=prefix):
76 print("found", obj.object_name)
77 if obj.object_name not in skip_paths:
78 # Get data of an object.
79 response = self.client.get_object(
80 self.bucket_name, obj.object_name
81 )
82 try:
83 # Append the object data to a local file
84 with open(output_path, "ab") as file:
85 # todo: cut first line
86 for d in response.stream(self.chunk_size):
87 file.write(d)
88 # Read data from response.
89 finally:
90 response.close()
91 response.release_conn()
92
93 def detete_results(self, prefix):
94 for obj in self.client.list_objects(self.bucket_name, prefix=prefix):
95 self.client.remove_object(self.bucket_name, obj.object_name)
96
97
98 if __name__ == "__main__":
99 # test code
100 obj_name_prefix = "photon_z0.1_E_1e+07_1e+13_B1e-15_L0.05-5_N"
101 n_particles = 10000
102 root_path = "/Users/ok/git/mcray/bin/"
103 particle = "photon"
104 obj_name = obj_name_prefix + str(n_particles)
105 file_dir = root_path + obj_name + "/z0"
106 file_path = file_dir + "/" + particle
107 c = CRbeamCache()
108 c.save(obj_name, file_path, n_particles=n_particles)
109 print(file_path + " sent to s3")
110 loaded_file_path = file_path + ".loaded"
111 c.load_results(loaded_file_path, obj_name)
112 print(loaded_file_path + " loaded from s3")