DagsterDocs

Source code for dagster_azure.adls2.file_cache

from dagster import Field, Selector, StringSource, check, resource
from dagster.core.storage.file_cache import FileCache

from .file_manager import ADLS2FileHandle
from .utils import ResourceNotFoundError, create_adls2_client


class ADLS2FileCache(FileCache):
    def __init__(
        self, storage_account, file_system, prefix, credential=None, overwrite=False, client=None
    ):
        super(ADLS2FileCache, self).__init__(overwrite=overwrite)

        self.storage_account = storage_account
        self.file_system = file_system
        self.prefix = prefix

        self.client = client or create_adls2_client(storage_account, credential)

    def has_file_object(self, file_key):
        check.str_param(file_key, "file_key")
        try:
            file = self.client.get_file_client(self.file_system, self.get_full_key(file_key))
            file.get_file_properties()
        except ResourceNotFoundError:
            return False
        return True

    def get_full_key(self, file_key):
        return "{base_key}/{file_key}".format(base_key=self.prefix, file_key=file_key)

    def write_file_object(self, file_key, source_file_object):
        check.str_param(file_key, "file_key")

        adls2_key = self.get_full_key(file_key)
        adls2_file = self.client.get_file_client(file_system=self.file_system, file_path=adls2_key)
        adls2_file.upload_data(source_file_object, overwrite=True)
        return self.get_file_handle(file_key)

    def get_file_handle(self, file_key):
        check.str_param(file_key, "file_key")
        return ADLS2FileHandle(
            self.client.account_name, self.file_system, self.get_full_key(file_key)
        )


[docs]@resource( { "storage_account": Field(StringSource, description="The storage account name."), "credential": Field( Selector( { "sas": Field(StringSource, description="SAS token for the account."), "key": Field(StringSource, description="Shared Access Key for the account"), } ), description="The credentials with which to authenticate.", ), "prefix": Field(StringSource, description="The base path prefix to use in ADLS2"), "file_system": Field( StringSource, description="The storage account filesystem (aka container)" ), "overwrite": Field(bool, is_required=False, default_value=False), } ) def adls2_file_cache(init_context): return ADLS2FileCache( storage_account=init_context.resource_config["storage_account"], file_system=init_context.resource_config["file_system"], prefix=init_context.resource_config["prefix"], credential=init_context.resource_config["credential"], overwrite=init_context.resource_config["overwrite"], # TODO: resource dependencies )