机器学习离线系统中常用的工具代码

  |  

摘要: 机器学习离线 Pipeline 中的常用工具

【对数据分析、人工智能、金融科技、风控服务感兴趣的同学,欢迎关注我哈,阅读更多原创文章】
我的网站:潮汐朝夕的生活实验室
我的公众号:潮汐朝夕
我的知乎:潮汐朝夕
我的github:FennelDumplings
我的leetcode:FennelDumplings


在互联网产品中,一个完整的机器学习的业务闭环中,一般涉及两个系统,一个在线系统,一个离线系统。

本文记录以下离线系统中常用的工具代码。也就是经常放在 utils 中的代码。

  • attr_dict.py 自定义字典
  • config.py 配置文件
  • logger.py 日志
  • timer 计时器
  • path.py 路径管理

自定义字典

  • attr_dict.py 自定义字典
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import sys

class AttrDict(dict):
"""Attribute Dict

AttrDict(mapping) -> AttrDict([('one', 1), ('two', 2)])
AttrDict(iterable) -> AttrDict({'one': 1, 'two': 2})
AttrDict(**kwargs) -> AttrDict(one=1, two=2)

Ref: https://github.com/makinacorpus/easydict

Note: the key of dict should be string type, in case some duplicate error.
"""
def __init__(self, *args, **kwargs):
meta_dict = dict(*args, **kwargs)
self.__parse(meta_dict)

def __setattr__(self, key, val):
if sys.version_info.major == 3 and not isinstance(key, (str, bytes)):
raise ValueError('the key of dict should be string type')
if isinstance(val, (list, tuple)):
val = [self.__class__(temp) if isinstance(temp, dict) else temp for temp in val]
elif isinstance(val, dict):
val = self.__class__(val)
super(AttrDict, self).__setattr__(key, val)
super(AttrDict, self).__setitem__(key, val)

__setitem__ = __setattr__

def __delattr__(self, key):
super(AttrDict, self).__delattr__(key)
super(AttrDict, self).__delitem__(key)

__delitem__ = __delattr__

def __parse(self, meta_dict):
"""Parse"""
for key, val in meta_dict.items():
self.__setattr__(key, val)
return

def update(self, *args, **kwargs):
"""Update"""
meta_dict = dict(*args, **kwargs)
self.__parse(meta_dict)
return

def pop(self, key, default=None):
"""Pop"""
super(AttrDict, self).__delattr__(key)
ret = super(AttrDict, self).pop(key, default)
return ret

配置文件

  • config.py 配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import argparse
import sys

import yaml
import ujson as json

from attr_dict import AttrDict


def load_config(config_path, display=False):
"""Load Config"""
if sys.version_info.major == 2:
config = AttrDict(yaml.load(open(config_path), Loader=yaml.FullLoader))
else:
config = AttrDict(yaml.load(open(config_path, encoding='utf-8'), Loader=yaml.FullLoader))
if display:
print(json.dumps(config, sort_keys=True, indent=4))
return config


def main():
"""Main"""
# Params
config_path = 'source/test.yaml'
load_config(config_path, display=True)
return


if __name__ == '__main__':
main()

日志

  • logger.py 日志
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import logging
import multiprocessing


COLOR_E = '\033[0m' # end(reset)

COLOR_R = '\033[0;91m' # red
COLOR_G = '\033[0;92m' # green
COLOR_Y = '\033[0;93m' # yellow
COLOR_B = '\033[0;94m' # blue
COLOR_P = '\033[0;95m' # purple
COLOR_C = '\033[0;96m' # cyan
COLOR_W = '\033[0;97m' # white

COLOR_B_R = '\033[1;91m' # bold red
COLOR_B_G = '\033[1;92m' # bold green
COLOR_B_Y = '\033[1;93m' # bold yellow
COLOR_B_B = '\033[1;94m' # bold blue
COLOR_B_P = '\033[1;95m' # bold purple
COLOR_B_C = '\033[1;96m' # bold cyan
COLOR_B_W = '\033[1;97m' # bold white

BASIC_FORMAT = '[%(name)s] %(asctime)s [%(levelname)s] %(message)s'
COLOR_FORMAT = '[{:}%(name)s{:}] %(asctime)s [{:}%(levelname)s{:}] %(message)s' \
.format(COLOR_B, COLOR_E, COLOR_G, COLOR_E)

COLOR_DEBUG = '{}{{}}{}'.format(COLOR_E, COLOR_E)
COLOR_INFO = '{}{{}}{}'.format(COLOR_P, COLOR_E)
COLOR_WARNING = '{}{{}}{}'.format(COLOR_Y, COLOR_E)
COLOR_ERROR = '{}{{}}{}'.format(COLOR_R, COLOR_E)


class Logger(object):
"""Logger"""
def __init__(self, name, is_debug=True, is_color=True, log_path=None, log_mode='w'):
self.logger = logging.getLogger(name)
self.is_debug = is_debug
self.is_color = is_color
self.log_path = log_path
self.log_mode = log_mode
if self.log_path is not None:
self.is_color = False
self.init_logger()

def init_logger(self):
"""Init Logger"""
# Clean
self.clean_handler()
# Level
self.logger.setLevel(logging.DEBUG if self.is_debug else logging.INFO)
# Formatter
if self.is_color:
formatter = logging.Formatter(
fmt=COLOR_FORMAT,
datefmt='%Y-%m-%d %H:%M:%S'
)
else:
formatter = logging.Formatter(
fmt=BASIC_FORMAT,
datefmt='%Y-%m-%d %H:%M:%S'
)
# Handler - Stream
handler_stream = logging.StreamHandler()
handler_stream.setFormatter(formatter)
self.logger.addHandler(handler_stream)
# Handler - File
if self.log_path is not None:
handler_file = logging.FileHandler(self.log_path, mode=self.log_mode)
handler_file.setFormatter(formatter)
self.logger.addHandler(handler_file)
return

def clean_handler(self):
"""Clean Handler"""
for handler in list(self.logger.handlers):
self.logger.removeHandler(handler)
return

def set_debug(self, is_debug=True):
"""Set Debug"""
self.is_debug = is_debug
self.init_logger()
return

def set_color(self, is_color=True):
"""Set Color"""
self.is_color = is_color
if self.is_color:
self.log_path = None
self.init_logger()
return

def set_log(self, log_path=None, log_mode='w'):
"""Set Log"""
self.log_path = log_path
self.log_mode = log_mode
if self.log_path is not None:
self.is_color = False
self.init_logger()
return

def debug(self, msg):
"""Debug"""
if self.is_color:
msg = COLOR_DEBUG.format(msg)
self.logger.debug(msg)
return

def info(self, msg):
"""Info"""
if self.is_color:
msg = COLOR_INFO.format(msg)
self.logger.info(msg)
return

def warning(self, msg):
"""Warning"""
if self.is_color:
msg = COLOR_WARNING.format(msg)
self.logger.warning(msg)
return

def error(self, msg):
"""Error"""
if self.is_color:
msg = COLOR_ERROR.format(msg)
self.logger.error(msg)
return


class SaveLogger(Logger):
"""Save Logger (Multi-Process)"""

def __init__(self, name, debug=True, color=True, log_path=None, mode='w'):

super(SaveLogger, self).__init__(name, debug, color, log_path, mode)

self.lock = multiprocessing.Lock()

def lock_debug(self, msg):
"""Lock Debug"""
with self.lock:
self.debug(msg)
return

def lock_info(self, msg):
"""Lock Info"""
with self.lock:
self.info(msg)
return

def lock_warning(self, msg):
"""Lock Warn"""
with self.lock:
self.warning(msg)
return

def lock_error(self, msg):
"""Lock Error"""
with self.lock:
self.error(msg)
return

计时器

  • timer 计时器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import time


class Timer(object):
"""Timer"""
def __init__(self):
self.tmp = 0
self.val = 0
self.cnt = 0
self.ttl = 0
self.avg = 0

def tic(self):
"""Tic"""
self.tmp = time.timt()
return

def toc(self):
"""Toc"""
self.val = time.time() - self.tmp
self.cnt += 1
self.ttl += self.val
self.avg = self.ttl / self.cnt

def reset(self):
"""Reset"""

路径管理

  • path.py 路径管理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import glob
import os
import shutil
import sys
from os import scandir

from pipeline.L99utils.logger import Logger


logger = Logger(name="path")


def parse_folder(path, mode='file'):
"""
Parse Folder
"""
path_list = []
for entry in scandir(path.encode('utf-8')):
if entry.name.startswith(b'.'): # skip hidden file
continue
sub_path = '{}/{}'.format(path, entry.name.decode('utf-8'))
if entry.is_file():
if mode == 'file':
path_list.append(sub_path)
else:
if mode == 'folder':
path_list.append(sub_path)
path_list.extend(parse_folder(sub_path, mode))
return path_list


def iparse_folder(path, mode='file'):
"""
Parse Folder (Iterator)
"""
for entry in scandir(path.encode('utf-8')):
if entry.name.startswith(b'.'): # skip hidden file
continue
sub_path = '{}/{}'.format(path, entry.name.decode('utf-8'))
if entry.is_file():
if mode == 'file':
yield sub_path
else:
if mode == 'folder':
yield sub_path
for sub_sub_path in parse_folder(sub_path, mode):
yield sub_sub_path


def create_folder(folder):
"""
Create Folder
"""
if folder and not os.path.isdir(folder):
os.makedirs(folder)
logger.info('create folder => {}'.format(os.path.abspath(folder)))
return


def remove_folder(folder):
"""
Remove Folder
"""
if os.path.isdir(folder):
shutil.rmtree(folder)
logger.info('remove folder => {}'.format(os.path.abspath(folder)))
return

Share