Mercurial > repos > astroteam > crbeam_astro_tool
comparison cache.py @ 0:f40d05521dca draft default tip
planemo upload for repository https://github.com/esg-epfl-apc/tools-astro/tree/main/tools commit de01e3c02a26cd6353a6b9b6f8d1be44de8ccd54
| author | astroteam |
|---|---|
| date | Fri, 25 Apr 2025 19:33:20 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:f40d05521dca |
|---|---|
| 1 import os | |
| 2 import json | |
| 3 | |
| 4 from minio import Minio | |
| 5 | |
| 6 # from minio.error import S3Error | |
| 7 | |
| 8 | |
| 9 class CRbeamCache(object): | |
| 10 credentials_env_var = "S3_CREDENTIALS" | |
| 11 bucket_name = "crbeam" | |
| 12 chunk_size = 32 * 1024 | |
| 13 | |
| 14 # default credentials | |
| 15 endpoint = "play.min.io" | |
| 16 access_key = "Q3AM3UQ867SPQQA43P2F" | |
| 17 secret_key = "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG" | |
| 18 | |
| 19 def __init__(self) -> None: | |
| 20 credentials = os.getenv(self.credentials_env_var) | |
| 21 if credentials is not None: | |
| 22 credentials = json.loads(credentials) | |
| 23 self.endpoint = credentials["endpoint"] | |
| 24 self.access_key = credentials["access_key"] | |
| 25 self.secret_key = credentials["secret_key"] | |
| 26 | |
| 27 self.client = Minio( | |
| 28 self.endpoint, | |
| 29 access_key=self.access_key, | |
| 30 secret_key=self.secret_key, | |
| 31 ) | |
| 32 | |
| 33 # Make bucket if not exist. | |
| 34 if not self.client.bucket_exists(self.bucket_name): | |
| 35 self.client.make_bucket(self.bucket_name) | |
| 36 | |
| 37 def save(self, obj_name, file_path, append_mode=True, **params): | |
| 38 if append_mode: | |
| 39 idx = 0 | |
| 40 for _ in self.client.list_objects( | |
| 41 self.bucket_name, prefix=obj_name | |
| 42 ): | |
| 43 idx += 1 | |
| 44 obj_path = obj_name + f"-{idx}" | |
| 45 else: | |
| 46 obj_path = obj_name | |
| 47 self.client.fput_object( | |
| 48 self.bucket_name, obj_path, file_path, metadata=params | |
| 49 ) | |
| 50 return obj_path | |
| 51 | |
| 52 def get_cache_size(self, prefix): | |
| 53 size = 0 | |
| 54 for obj in self.client.list_objects(self.bucket_name, prefix=prefix): | |
| 55 size += int( | |
| 56 obj.object_name[len(prefix):].split("-")[0] | |
| 57 ) # todo: load number of particles from metadata | |
| 58 return size | |
| 59 | |
| 60 def load_file(self, output_path, obj_name): | |
| 61 try: | |
| 62 response = self.client.get_object(self.bucket_name, obj_name) | |
| 63 except Exception: | |
| 64 raise ValueError("object not found") | |
| 65 try: | |
| 66 # Read data from response. | |
| 67 with open(output_path, "wb") as file: | |
| 68 for d in response.stream(self.chunk_size): | |
| 69 file.write(d) | |
| 70 finally: | |
| 71 response.close() | |
| 72 response.release_conn() | |
| 73 | |
| 74 def load_results(self, output_path, prefix, skip_paths=[]): | |
| 75 for obj in self.client.list_objects(self.bucket_name, prefix=prefix): | |
| 76 print("found", obj.object_name) | |
| 77 if obj.object_name not in skip_paths: | |
| 78 # Get data of an object. | |
| 79 response = self.client.get_object( | |
| 80 self.bucket_name, obj.object_name | |
| 81 ) | |
| 82 try: | |
| 83 # Append the object data to a local file | |
| 84 with open(output_path, "ab") as file: | |
| 85 # todo: cut first line | |
| 86 for d in response.stream(self.chunk_size): | |
| 87 file.write(d) | |
| 88 # Read data from response. | |
| 89 finally: | |
| 90 response.close() | |
| 91 response.release_conn() | |
| 92 | |
| 93 def detete_results(self, prefix): | |
| 94 for obj in self.client.list_objects(self.bucket_name, prefix=prefix): | |
| 95 self.client.remove_object(self.bucket_name, obj.object_name) | |
| 96 | |
| 97 | |
| 98 if __name__ == "__main__": | |
| 99 # test code | |
| 100 obj_name_prefix = "photon_z0.1_E_1e+07_1e+13_B1e-15_L0.05-5_N" | |
| 101 n_particles = 10000 | |
| 102 root_path = "/Users/ok/git/mcray/bin/" | |
| 103 particle = "photon" | |
| 104 obj_name = obj_name_prefix + str(n_particles) | |
| 105 file_dir = root_path + obj_name + "/z0" | |
| 106 file_path = file_dir + "/" + particle | |
| 107 c = CRbeamCache() | |
| 108 c.save(obj_name, file_path, n_particles=n_particles) | |
| 109 print(file_path + " sent to s3") | |
| 110 loaded_file_path = file_path + ".loaded" | |
| 111 c.load_results(loaded_file_path, obj_name) | |
| 112 print(loaded_file_path + " loaded from s3") |
