view cache.py @ 0:f40d05521dca draft default tip

planemo upload for repository https://github.com/esg-epfl-apc/tools-astro/tree/main/tools commit de01e3c02a26cd6353a6b9b6f8d1be44de8ccd54
author astroteam
date Fri, 25 Apr 2025 19:33:20 +0000
parents
children
line wrap: on
line source

import os
import json

from minio import Minio

# from minio.error import S3Error


class CRbeamCache(object):
    credentials_env_var = "S3_CREDENTIALS"
    bucket_name = "crbeam"
    chunk_size = 32 * 1024

    # default credentials
    endpoint = "play.min.io"
    access_key = "Q3AM3UQ867SPQQA43P2F"
    secret_key = "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG"

    def __init__(self) -> None:
        credentials = os.getenv(self.credentials_env_var)
        if credentials is not None:
            credentials = json.loads(credentials)
            self.endpoint = credentials["endpoint"]
            self.access_key = credentials["access_key"]
            self.secret_key = credentials["secret_key"]

        self.client = Minio(
            self.endpoint,
            access_key=self.access_key,
            secret_key=self.secret_key,
        )

        # Make bucket if not exist.
        if not self.client.bucket_exists(self.bucket_name):
            self.client.make_bucket(self.bucket_name)

    def save(self, obj_name, file_path, append_mode=True, **params):
        if append_mode:
            idx = 0
            for _ in self.client.list_objects(
                self.bucket_name, prefix=obj_name
            ):
                idx += 1
            obj_path = obj_name + f"-{idx}"
        else:
            obj_path = obj_name
        self.client.fput_object(
            self.bucket_name, obj_path, file_path, metadata=params
        )
        return obj_path

    def get_cache_size(self, prefix):
        size = 0
        for obj in self.client.list_objects(self.bucket_name, prefix=prefix):
            size += int(
                obj.object_name[len(prefix):].split("-")[0]
            )  # todo: load number of particles from metadata
        return size

    def load_file(self, output_path, obj_name):
        try:
            response = self.client.get_object(self.bucket_name, obj_name)
        except Exception:
            raise ValueError("object not found")
        try:
            # Read data from response.
            with open(output_path, "wb") as file:
                for d in response.stream(self.chunk_size):
                    file.write(d)
        finally:
            response.close()
            response.release_conn()

    def load_results(self, output_path, prefix, skip_paths=[]):
        for obj in self.client.list_objects(self.bucket_name, prefix=prefix):
            print("found", obj.object_name)
            if obj.object_name not in skip_paths:
                # Get data of an object.
                response = self.client.get_object(
                    self.bucket_name, obj.object_name
                )
                try:
                    # Append the object data to a local file
                    with open(output_path, "ab") as file:
                        # todo: cut first line
                        for d in response.stream(self.chunk_size):
                            file.write(d)
                    # Read data from response.
                finally:
                    response.close()
                    response.release_conn()

    def detete_results(self, prefix):
        for obj in self.client.list_objects(self.bucket_name, prefix=prefix):
            self.client.remove_object(self.bucket_name, obj.object_name)


if __name__ == "__main__":
    # test code
    obj_name_prefix = "photon_z0.1_E_1e+07_1e+13_B1e-15_L0.05-5_N"
    n_particles = 10000
    root_path = "/Users/ok/git/mcray/bin/"
    particle = "photon"
    obj_name = obj_name_prefix + str(n_particles)
    file_dir = root_path + obj_name + "/z0"
    file_path = file_dir + "/" + particle
    c = CRbeamCache()
    c.save(obj_name, file_path, n_particles=n_particles)
    print(file_path + " sent to s3")
    loaded_file_path = file_path + ".loaded"
    c.load_results(loaded_file_path, obj_name)
    print(loaded_file_path + " loaded from s3")