diff cache.py @ 0:f40d05521dca draft default tip

planemo upload for repository https://github.com/esg-epfl-apc/tools-astro/tree/main/tools commit de01e3c02a26cd6353a6b9b6f8d1be44de8ccd54
author astroteam
date Fri, 25 Apr 2025 19:33:20 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/cache.py	Fri Apr 25 19:33:20 2025 +0000
@@ -0,0 +1,112 @@
+import os
+import json
+
+from minio import Minio
+
+# from minio.error import S3Error
+
+
+class CRbeamCache(object):
+    credentials_env_var = "S3_CREDENTIALS"
+    bucket_name = "crbeam"
+    chunk_size = 32 * 1024
+
+    # default credentials
+    endpoint = "play.min.io"
+    access_key = "Q3AM3UQ867SPQQA43P2F"
+    secret_key = "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG"
+
+    def __init__(self) -> None:
+        credentials = os.getenv(self.credentials_env_var)
+        if credentials is not None:
+            credentials = json.loads(credentials)
+            self.endpoint = credentials["endpoint"]
+            self.access_key = credentials["access_key"]
+            self.secret_key = credentials["secret_key"]
+
+        self.client = Minio(
+            self.endpoint,
+            access_key=self.access_key,
+            secret_key=self.secret_key,
+        )
+
+        # Make bucket if not exist.
+        if not self.client.bucket_exists(self.bucket_name):
+            self.client.make_bucket(self.bucket_name)
+
+    def save(self, obj_name, file_path, append_mode=True, **params):
+        if append_mode:
+            idx = 0
+            for _ in self.client.list_objects(
+                self.bucket_name, prefix=obj_name
+            ):
+                idx += 1
+            obj_path = obj_name + f"-{idx}"
+        else:
+            obj_path = obj_name
+        self.client.fput_object(
+            self.bucket_name, obj_path, file_path, metadata=params
+        )
+        return obj_path
+
+    def get_cache_size(self, prefix):
+        size = 0
+        for obj in self.client.list_objects(self.bucket_name, prefix=prefix):
+            size += int(
+                obj.object_name[len(prefix):].split("-")[0]
+            )  # todo: load number of particles from metadata
+        return size
+
+    def load_file(self, output_path, obj_name):
+        try:
+            response = self.client.get_object(self.bucket_name, obj_name)
+        except Exception:
+            raise ValueError("object not found")
+        try:
+            # Read data from response.
+            with open(output_path, "wb") as file:
+                for d in response.stream(self.chunk_size):
+                    file.write(d)
+        finally:
+            response.close()
+            response.release_conn()
+
+    def load_results(self, output_path, prefix, skip_paths=[]):
+        for obj in self.client.list_objects(self.bucket_name, prefix=prefix):
+            print("found", obj.object_name)
+            if obj.object_name not in skip_paths:
+                # Get data of an object.
+                response = self.client.get_object(
+                    self.bucket_name, obj.object_name
+                )
+                try:
+                    # Append the object data to a local file
+                    with open(output_path, "ab") as file:
+                        # todo: cut first line
+                        for d in response.stream(self.chunk_size):
+                            file.write(d)
+                    # Read data from response.
+                finally:
+                    response.close()
+                    response.release_conn()
+
+    def detete_results(self, prefix):
+        for obj in self.client.list_objects(self.bucket_name, prefix=prefix):
+            self.client.remove_object(self.bucket_name, obj.object_name)
+
+
+if __name__ == "__main__":
+    # test code
+    obj_name_prefix = "photon_z0.1_E_1e+07_1e+13_B1e-15_L0.05-5_N"
+    n_particles = 10000
+    root_path = "/Users/ok/git/mcray/bin/"
+    particle = "photon"
+    obj_name = obj_name_prefix + str(n_particles)
+    file_dir = root_path + obj_name + "/z0"
+    file_path = file_dir + "/" + particle
+    c = CRbeamCache()
+    c.save(obj_name, file_path, n_particles=n_particles)
+    print(file_path + " sent to s3")
+    loaded_file_path = file_path + ".loaded"
+    c.load_results(loaded_file_path, obj_name)
+    print(loaded_file_path + " loaded from s3")