Commit ec4777cd authored by Ottomata's avatar Ottomata
Browse files

Merge branch 'ci' into 'main'

Error for mypy in CI

See merge request repos/data-engineering/workflow_utils!13
parents 093e3d50 3b1b1431
......@@ -2,13 +2,10 @@
files = workflow_utils
ignore_missing_imports = True
ignore_errors = True
follow_imports = silent
# methods signature should be typed
disallow_untyped_defs = True
# disallows usage of types that come from unfollowed imports
disallow_any_unimported = True
# <- Explicit is better than implicit. Open to debate :)
no_implicit_optional = True
# Type-checks the interior of functions without type annotations.
......
......@@ -29,12 +29,11 @@ console_scripts =
test =
pytest ==6.2.*
pytest-cov ==3.*
flake8 ==3.9.*
mypy ==0.931
# Apparently there is a bug in flake8 4+?
# Downgrade to flake8 3 for now.
# https://githubmemory.com/repo/tholo/pytest-flake8/issues/81
flake8 ==3.9.*
flake8 ==4.0.*
pylint ==2.12.*
docs =
......
from typing import List, Sequence, Optional
from yamlreader import yaml_load # type: ignore
from yamlreader import yaml_load
from workflow_utils.artifact.locator import ArtifactLocator
from workflow_utils.artifact.source import ArtifactSource
......@@ -47,24 +47,24 @@ class Artifact():
super().__init__()
def __repr__(self):
def __repr__(self) -> str:
return f'{self.__class__.__name__}({self.name})'
def __str__(self):
def __str__(self) -> str:
locators_desc = [
f'{locator.url(self.id)}\t(exists={locator.exists(self.id)})'
for locator in self.locators()
]
return self.__repr__() + ":\n\t" + "\n\t".join(locators_desc)
def cache_put(self, force: bool = False):
def cache_put(self, force: bool = False) -> None:
"""
Puts the artifact in all of its cache locations.
"""
for cache in self.caches:
cache.put(self.id, self.source.open(self.id), force=force)
def cache_delete(self):
def cache_delete(self) -> None:
"""
Deletes the artifact from all of its cache locations.
"""
......@@ -156,7 +156,7 @@ class Artifact():
default_source_name: Optional[str] = None,
default_cache_names: Optional[List[str]] = None,
name: Optional[str] = None,
):
) -> 'Artifact':
"""
Instantiates an Artifact from a config dict.
......@@ -232,7 +232,7 @@ class Artifact():
)
@classmethod
def load_artifacts_from_config(cls, config) -> dict:
def load_artifacts_from_config(cls, config: dict) -> dict:
"""
Parses the config to instantiate ArtifactSources, ArtifactCaches
and Artifacts declared with those sources and caches.
......
from typing import Any
from abc import abstractmethod
import os
from urllib.parse import urlparse
import fsspec # type: ignore
import fsspec
from fsspec import AbstractFileSystem
from workflow_utils.util import safe_filename
from workflow_utils.artifact.maven import maven_artifact_uri
......@@ -45,7 +48,7 @@ class ArtifactCache(ArtifactLocator):
return safe_filename(artifact_id)
@abstractmethod
def open(self, artifact_id: str):
def open(self, artifact_id: str) -> Any:
"""
Returns an file-like object at the cache URL for writing.
......@@ -53,7 +56,7 @@ class ArtifactCache(ArtifactLocator):
"""
@abstractmethod
def put(self, artifact_id, input_stream, force=False):
def put(self, artifact_id: str, input_stream: Any, force: bool = False) -> None:
"""
Put input in cache with artifact name.
......@@ -63,7 +66,7 @@ class ArtifactCache(ArtifactLocator):
"""
@abstractmethod
def delete(self, artifact_id: str):
def delete(self, artifact_id: str) -> None:
"""
Delete artifact from cache.
......@@ -85,10 +88,10 @@ class FsArtifactCache(ArtifactCache):
self.base_uri = base_uri
super().__init__()
def __repr__(self):
def __repr__(self) -> str:
return f'{self.__class__.__name__}({self.base_uri})'
def fs(self) -> fsspec.AbstractFileSystem: # pylint: disable=invalid-name
def fs(self) -> AbstractFileSystem: # pylint: disable=invalid-name
"""
We don't keep a reference to the FileSystem used by
this name. The FileSystem can be obtained by calling
......@@ -100,16 +103,16 @@ class FsArtifactCache(ArtifactCache):
def url(self, artifact_id: str) -> str:
return os.path.join(self.base_uri, self.cache_key(artifact_id))
def exists(self, artifact_id: str):
def exists(self, artifact_id: str) -> bool:
url_parsed = urlparse(self.url(artifact_id))
return self.fs().exists(url_parsed.path)
return bool(self.fs().exists(url_parsed.path))
def open(self, artifact_id: str):
def open(self, artifact_id: str) -> Any:
url = self.url(artifact_id)
self.log.debug(f'Opening {artifact_id} cache {url} for writing.')
return fsspec.open(url, mode='wb').open() # pylint: disable=no-member
def put(self, artifact_id: str, input_stream, force=False):
def put(self, artifact_id: str, input_stream: Any, force: bool = False) -> None:
if force or not self.exists(artifact_id):
self.log.debug(f'Cache put of {artifact_id} (force={force})')
with self.open(artifact_id) as output:
......@@ -118,7 +121,7 @@ class FsArtifactCache(ArtifactCache):
else:
self.log.debug(f'{artifact_id} is already cached.')
def delete(self, artifact_id: str):
def delete(self, artifact_id: str) -> None:
if self.exists(artifact_id):
self.log.debug(f'Cache delete of {artifact_id}')
self.fs().rm(self.url(artifact_id))
......
from typing import List
import os
import sys
from docopt import docopt # type: ignore
from docopt import docopt
from workflow_utils.artifact import Artifact
from workflow_utils.util import setup_logging, fsspec_use_new_pyarrow_api
def main(argv=sys.argv[1:]): # pylint: disable=dangerous-default-value
def main(argv: List[str] = sys.argv[1:]) -> None: # pylint: disable=dangerous-default-value
script_name = os.path.basename(sys.argv[0])
doc = f"""
Usage: {script_name} status <artifact_config_files>...
......
......@@ -2,6 +2,8 @@
Functions to help working with Maven coordinate strings.
"""
from typing import Union
import os
......@@ -44,7 +46,7 @@ def parse_maven_coordinate(coordinate: str) -> dict:
}
def maven_artifact_filename(coordinate) -> str:
def maven_artifact_filename(coordinate: Union[str, dict]) -> str:
"""
Gets the filename in a maven repository for the maven coordinate.
......@@ -62,7 +64,7 @@ def maven_artifact_filename(coordinate) -> str:
return filename
def maven_artifact_dir(coordinate) -> str:
def maven_artifact_dir(coordinate: Union[str, dict]) -> str:
"""
Gets the directory path in a maven repository for the maven coordinate.
......@@ -74,10 +76,11 @@ def maven_artifact_dir(coordinate) -> str:
coordinate['artifact_id'],
coordinate['version']
]
return os.path.join(*path_parts)
return str(os.path.join(*path_parts))
def maven_artifact_uri(coordinate, repo_uri: str = '') -> str:
def maven_artifact_uri(coordinate: Union[str, dict], repo_uri: str = '') -> str:
"""
Gets the artifact URI to a file in a maven repository.
......
from typing import Any
from abc import abstractmethod
import os
from urllib.parse import urlparse
import fsspec # type: ignore
import fsspec
from workflow_utils.artifact.locator import ArtifactLocator
from workflow_utils.artifact.maven import maven_artifact_uri
......@@ -12,7 +14,7 @@ class ArtifactSource(ArtifactLocator):
An ArtifactLocator that can open artifacts for reading.
"""
@abstractmethod
def open(self, artifact_id: str):
def open(self, artifact_id: str) -> Any:
"""
Returns read only file-like IO stream object for artifact_id.
......@@ -38,7 +40,7 @@ class FsArtifactSource(ArtifactSource):
self.base_uri = base_uri
super().__init__()
def __repr__(self):
def __repr__(self) -> str:
return f'{self.__class__.__name__}({self.base_uri})'
def url(self, artifact_id: str) -> str:
......@@ -50,7 +52,7 @@ class FsArtifactSource(ArtifactSource):
"""
return os.path.join(self.base_uri, artifact_id)
def open(self, artifact_id: str):
def open(self, artifact_id: str) -> Any:
"""
:param artifact_id: a URI
"""
......@@ -64,7 +66,7 @@ class FsArtifactSource(ArtifactSource):
"""
url = self.url(artifact_id)
url_parsed = urlparse(url)
return fsspec.open(url).fs.exists(url_parsed.path)
return bool(fsspec.open(url).fs.exists(url_parsed.path))
class MavenArtifactSource(FsArtifactSource):
......
from typing import Optional, List
from typing import Optional, List, Any
import os
import sys
import logging
import shutil
import subprocess
import conda_pack # type: ignore
from docopt import docopt # type: ignore
import conda_pack
from docopt import docopt
from workflow_utils.util import setup_logging, filter_files_exist
......@@ -25,7 +25,7 @@ Default kwargs to pass to conda_pack.pack.
"""
def pack(**conda_pack_kwargs) -> str:
def pack(**conda_pack_kwargs: Any) -> str:
"""
Calls conda_pack.pack.
If the packed output file already exists, this will not repackage
......@@ -55,12 +55,12 @@ def pack(**conda_pack_kwargs) -> str:
"force=True in conda_pack_kwargs and it will be repacked for you.",
conda_packed_file
)
return conda_packed_file
return str(conda_packed_file)
# NOTE: If no conda env is currently active, and kwargs
# doesn"t contain information about what env to pack (i.e. no name or prefix)
# then this will raise an error.
return conda_pack.pack(**kwargs)
return str(conda_pack.pack(**kwargs))
def find_conda_exe(conda_exe: Optional[str] = None) -> Optional[str]:
......@@ -143,9 +143,7 @@ def find_pip_exe(
return None
def _run(
command: List[str],
):
def _run(command: List[str]) -> int:
"""
Runs command via subprocess.run.
print stdout and stderr in the caller process,
......@@ -170,7 +168,7 @@ def _run(
def conda_cli(
args: Optional[List[str]] = None,
conda_exe: Optional[str] = None,
):
) -> int:
"""
Shells out to conda_exe to run a command.
......@@ -200,7 +198,7 @@ def conda_cli(
def pip_cli(
args: Optional[List[str]] = None,
pip_exe: Optional[str] = None,
):
) -> int:
"""
Shells out to pip_exe to run a command.
......@@ -213,8 +211,6 @@ def pip_cli(
Set PIP_EXE environment variable to configure
this externally.
:prefix:
Possible
:return:
return code of pip command
......@@ -234,7 +230,7 @@ def conda_env_update(
prefix: str,
args: Optional[List[str]] = None,
conda_exe: Optional[str] = None,
):
) -> int:
"""
Shells out to a conda env update command that updates prefix.
......@@ -247,6 +243,9 @@ def conda_env_update(
:param conda_exe:
conda_exe param to conda_cli
:return:
return code of conda cli command
"""
if args is None:
args = []
......@@ -256,9 +255,18 @@ def conda_env_update(
def conda_create(
prefix: str,
args: Optional[List[str]] = None
) -> str:
) -> int:
"""
Creates a conda enviroment at prefix.
:param prefix:
Path to conda env to update.
:param args:
Args to pass to conda env update
:return:
return code of conda cli command
"""
if args is None:
args = []
......@@ -268,9 +276,18 @@ def conda_create(
def conda_install(
prefix: str,
args: Optional[List[str]] = None,
):
) -> int:
"""
Calls conda install on prefix.
:param prefix:
Path to conda env to update.
:param args:
Args to pass to conda env update
:return:
return code of conda cli command
"""
if args is None:
args = []
......@@ -280,7 +297,7 @@ def conda_install(
def pip_install(
prefix: str,
args: Optional[List[str]] = None,
):
) -> int:
"""
Shells out to pip install to install into prefix.
......@@ -293,6 +310,8 @@ def pip_install(
:param pip_exe:
pip_exe param to pip_cli
:return:
return code of pip command
"""
if args is None:
args = []
......@@ -445,7 +464,7 @@ def conda_create_dist_env(
def conda_create_and_pack_dist_env(
dist_env_prefix: str,
dist_env_dest: str,
**build_kwargs
**build_kwargs: Any
) -> str:
"""
Calls conda_create_dist_env and then uses conda-pack to create
......@@ -457,6 +476,10 @@ def conda_create_and_pack_dist_env(
dist_env_prefix = os.path.realpath(dist_env_prefix)
conda_create_dist_env(dist_env_prefix, **build_kwargs)
# NOTE: Maybe get the project name and version out of the dist env
# by using importlib.metadata?
return pack(
prefix=dist_env_prefix,
output=dist_env_dest,
......@@ -464,7 +487,7 @@ def conda_create_and_pack_dist_env(
)
def conda_dist(argv: Optional[List[str]] = None):
def conda_dist(argv: Optional[List[str]] = None) -> None:
if argv is None:
argv = sys.argv[1:]
script_name = os.path.basename(sys.argv[0])
......
from typing import List
from typing import List, Any, Optional
import logging
import importlib
......@@ -23,7 +23,7 @@ class LogHelper(): # pylint: disable=too-few-public-methods
self.log.info("my message")
"""
def __init__(self):
def __init__(self) -> None:
"""
Creates a class instance specific logger as self.log using __repr__.
Your implementing class should probably call
......@@ -36,11 +36,11 @@ class LogHelper(): # pylint: disable=too-few-public-methods
{'instance_repr': self.__repr__()}
)
def __repr__(self):
def __repr__(self) -> str:
return self.__class__.__name__
def safe_filename(name: str, maxlen: int = 255):
def safe_filename(name: str, maxlen: int = 255) -> str:
"""
Given a string, returns a 'safe' filename.
......@@ -49,7 +49,7 @@ def safe_filename(name: str, maxlen: int = 255):
"""
keep_chars = ['.', '_', '-']
def safe_char(char):
def safe_char(char: str) -> str:
if char.isalnum() or char in keep_chars:
return char
return "_"
......@@ -61,7 +61,7 @@ def filter_files_exist(files: List[str]) -> List[str]:
return [file for file in files if os.path.exists(file)]
def instantiate(class_name: str, **args):
def instantiate(class_name: str, **args: Any) -> Any:
"""
Factory method for instantiating classes from
kwargs. class_name is expected to be a fully qualified class name of a
......@@ -97,7 +97,7 @@ def instantiate(class_name: str, **args):
return cls(**args)
def instantiate_all(instances_config):
def instantiate_all(instances_config: dict) -> dict:
"""
Given a dict of instance config keyed by name,
call instantiate() on each of them, and return
......@@ -120,7 +120,7 @@ def instantiate_all(instances_config):
}
def find_java_home():
def find_java_home() -> Optional[str]:
"""
Looks for first JAVA_HOME that exists using JAVA_HOME,
bigtop-detect-javahome and java -XshowSettings:properties
......@@ -148,7 +148,7 @@ def find_java_home():
return None
def find_hadoop_home():
def find_hadoop_home() -> str:
"""
Looks for first hadoop home that exists in HADOOP_HOME
and other possible locations,
......@@ -160,21 +160,20 @@ def find_hadoop_home():
return next(filter(os.path.exists, possibilities), '')
def find_hadoop_exec():
def find_hadoop_exec() -> Optional[str]:
"""
Looks for 'hadoop' executable in result of find_hadoop_home()
or shutil.which('hadoop')
"""
hadoop: Optional[str] = None
if os.path.exists(os.path.join(find_hadoop_home(), 'bin', 'hadoop')):
hadoop = os.path.join(find_hadoop_home(), 'bin', 'hadoop')
elif shutil.which('hadoop'):
hadoop = shutil.which('hadoop')
else:
hadoop = None
return hadoop
def get_hadoop_classpath():
def get_hadoop_classpath() -> Optional[str]:
"""
Uses result of find_hadoop_exec() to call
`hadoop classpath` and returns result.
......@@ -190,7 +189,7 @@ def get_hadoop_classpath():
return hadoop_classpath
def set_hadoop_env_vars(force: bool = False):
def set_hadoop_env_vars(force: bool = False) -> None:
"""
Sets pyarrow related Hadoop environment variables if they aren't already set.
This automates pyarrow (and fsspec) using the newer pyarrow HDFS API without
......@@ -219,7 +218,7 @@ def set_hadoop_env_vars(force: bool = False):
os.environ[env_var] = value
def fsspec_use_new_pyarrow_api(should_set_hadoop_env_vars: bool = True):
def fsspec_use_new_pyarrow_api(should_set_hadoop_env_vars: bool = True) -> None:
"""
Registers hdfs:// filesystems to always use the arrow_hdfs://
implementation. This uses the newer pyarrow HDFS API.
......@@ -241,7 +240,7 @@ def fsspec_use_new_pyarrow_api(should_set_hadoop_env_vars: bool = True):
# NOTE: i dunno about this.
def setup_logging(level=None):
def setup_logging(level: Optional[str] = None) -> None:
"""
Configures basic logging defaults.
If level is not given, but the environment variable LOG_LEVEL
......@@ -272,7 +271,7 @@ def setup_logging(level=None):
# All log records also need to have an instance_repr entry.
# Add a logging filter to the root logger's handlers to
# make sure that instance_repr is set if a child logger hasn't set it.
def inject_instance_name(record):
def inject_instance_name(record: Any) -> bool:
if not hasattr(record, 'instance_repr'):
record.instance_repr = record.name
return True
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment