Dust8 的博客

读书百遍其义自见

0%

有台服务器只给内部人员用, 所以就添加了白名单. 但是 ip 总是变, 手动改太麻烦了,
就写了个定时查询出 ip, 发现 ip 变了就自动修改阿里云的安全组设置的脚本.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#!/usr/bin/env python
# coding=utf-8
# https://helpcdn.aliyun.com/document_detail/25699.html
# https://helpcdn.aliyun.com/document_detail/25554.html
import logging
import time

import requests
from aliyunsdkcore.acs_exception.exceptions import ClientException, ServerException
from aliyunsdkcore.client import AcsClient
from aliyunsdkecs.request.v20140526.AuthorizeSecurityGroupRequest import (
AuthorizeSecurityGroupRequest,
)
from aliyunsdkecs.request.v20140526.RevokeSecurityGroupRequest import (
RevokeSecurityGroupRequest,
)

FORMAT = "%(asctime)-15s %(levelname)s %(message)s"
logging.basicConfig(level=logging.INFO, format=FORMAT)

# 修改为自己的 ram 账号
client = AcsClient(
"x", "x", "x"
)


def get_ip():
url = "http://www.httpbin.org/ip"
res = requests.get(url)

data = res.json()
return data["origin"] + "/24"


def del_group(security_group_id, ip_protocol, port_range, source_cidr_ip):
"""删除规则"""
request = RevokeSecurityGroupRequest()
request.set_accept_format("json")
request.set_SecurityGroupId(security_group_id)
request.set_PortRange(port_range)
request.set_IpProtocol(ip_protocol)
request.set_SourceCidrIp(source_cidr_ip)

response = client.do_action_with_exception(request)
# print(str(response, encoding='utf-8'))


def add_group(security_group_id, ip_protocol, port_range, dsecription, source_cidr_ip):
"""添加规则"""
request = AuthorizeSecurityGroupRequest()
request.set_accept_format("json")

request.set_SecurityGroupId(security_group_id)
request.set_IpProtocol(ip_protocol)
request.set_PortRange(port_range)
request.set_Description(dsecription)
request.set_SourceCidrIp(source_cidr_ip)

response = client.do_action_with_exception(request)
# print(str(response, encoding='utf-8'))


def main():
security_group_id = "修改为自己的安全组id"
ip_protocol = "tcp"
port_range = "9000/9999"
dsecription = "公司ip"

old_ip = "192.168.0.1/24"
new_ip = ""

while True:
try:
new_ip = get_ip()
except:
time.sleep(5)
continue

if old_ip != new_ip:
logging.info(f"{old_ip} => {new_ip}")
del_group(
security_group_id=security_group_id,
ip_protocol=ip_protocol,
port_range=port_range,
source_cidr_ip=old_ip,
)
add_group(
security_group_id=security_group_id,
ip_protocol=ip_protocol,
port_range=port_range,
dsecription=dsecription,
source_cidr_ip=new_ip,
)
old_ip = new_ip

time.sleep(60)


if __name__ == "__main__":
main()

简单自定义

这个官网有教程, 可以点击下面的参考链接看.

1
2
3
4
5
6
class MyModel(models.Model):
# file will be uploaded to MEDIA_ROOT/uploads
upload = models.FileField(upload_to='uploads/')
# or...
# file will be saved to MEDIA_ROOT/uploads/2015/01/30
upload = models.FileField(upload_to='uploads/%Y/%m/%d/')
1
2
3
4
5
6
def user_directory_path(instance, filename):
# file will be uploaded to MEDIA_ROOT/user_<id>/<filename>
return 'user_{0}/{1}'.format(instance.user.id, filename)

class MyModel(models.Model):
upload = models.FileField(upload_to=user_directory_path)

在复杂一些

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def _upload_to(attrs=None,root=''):
def upload_to(instance, filename):
ins = instance

if attrs:
for attr in attrs.split('.'):
ins = getattr(ins,attr)

path = f'{root}{ins}/{filename}'
else:
path = f'{root}{filename}'

return path

return upload_to


class TBusinesschain(models.Model):
bc_id = models.AutoField(db_column='BC_Id', primary_key=True)
bc_logo = models.FileField(upload_to=_upload_to('bc_id','business/chain/'),db_column='BC_Logo', max_length=50, verbose_name='店铺Log')


class TBusinesscoupon(models.Model):
bc_id = models.AutoField(db_column='BC_Id', primary_key=True)
bc_shopid = models.ForeignKey(TBusiness, on_delete=models.CASCADE,
bc_logo = models.FileField(upload_to=_upload_to('bc_shopid.bi_id','business/'),db_column='BC_Logo', max_length=50, verbose_name='logo')

参考链接

有个项目是用其他语言写的接口, 需要快速开发出后台管理页面, 就选用了 django. 按照下面的方法就可以在几分钟内开发完成.

生成模型文件

这个官网有教程, 可以点击下面的参考链接看. 只需要运行一条命令就可以生成模型文件

1
python manage.py inspectdb > models.py

生成后台管理文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# lib/gen_admin.py
import inspect
import os
from pathlib import Path
import sys

import django


def get_classes(arg):
"""找出模块里所有的类"""
classes = []

clsmembers = inspect.getmembers(arg, inspect.isclass)
for (_, value) in clsmembers:
classes.append(value)

return classes


def gen_admin(model):
"""根据模型生成 admin.py 的内容"""
temp = f"""
@admin.register({model.__name__})
class {model.__name__}Admin(admin.ModelAdmin):
list_display = ('{"','".join((field.name for field in model._meta.fields))}')
"""
return temp


def gen_admins(models):
result = ""
for model in models:
result += gen_admin(model)
return result


def main():
BASE_DIR = Path(__file__).resolve(strict=True).parent.parent
sys.path.append(str(BASE_DIR))

# 改成你的配置
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "dazhong.settings")
django.setup()

# 要生成的模型
import areas

classes = get_classes(areas.models)
result = gen_admins(classes)
print(result)


if __name__ == "__main__":
main()

参考链接

常见的我就不说了, 我下面说一些我用到的, 又不容易搜索到的知识. 其实官网文档有说明, 可能会漏看,或者看过不知道用在什么场景.

嵌套序列化过滤

在一对多模型中, 如果通过一来查出多的模型,会把多的模型数据全部返回. 但是我现在只需要返回多的模型里面一部分数据,例如没有逻辑删的数据. 这时候可以指定多模型的 list_serializer_class 并重写 listserializerto_representation 方法.
.to_representation() - Override this to support serialization, for read operations.

1
2
3
4
5
6
7
8
9
10
11
12
class CustomListSerializer(serializers.ListSerializer):
...
def to_representation(self, instance):
instance = instance.filter(is_deleted=False)
ret = super().to_representation(instance)

return ret

class CustomSerializer(serializers.Serializer):
...
class Meta:
list_serializer_class = CustomListSerializer

增加额外的字段

还是一对多的模型里面,想要在一的返回数据里面增加一个多的数量字段.还是可以重写 to_representation 方法.同理也可以减少或者修改字段和值.

1
2
3
4
5
6
7
def to_representation(self, instance):
instance = instance.filter(is_deleted=False)
ret = super().to_representation(instance)

ret['many_num'] = Many.objects.filter(one=instance).count()

return ret

越级筛选数据

这个是因为多层嵌套模型,在序列化中传值问题,例如把请求的参数传入到后面几层的序列化里面过滤.

1
2
3
4
5
6
7
def to_representation(self, instance):
request = self.context['request']

instance = instance.filter(user=request.user)
ret = super().to_representation(instance)

return ret

参考链接

前几天看到老外用 speechrecognition 写了一个语音助手, 感觉既简单又有趣,
而且我前几年用这个库做个语音转文字. 去年还买了本 <<自然语言处理实战(聊天机器人技术原理与应用)>> 的书,
还没翻过几页. 正好把这个冲动的结果给利用上.

源码: xiaohui

xiaohui 是个面向任务的对话系统.
可以查时间,打开程序, 搜索网页等任务. 具体任务可以看 xiaohui/data/nlu.md 里面的内容.
还可以自己定制任务, 对应的也需要定制 xiaohui/actins.py 的执行动作.

主要模块

主要模块

NLU 模块我用的是 fuzzywuzzy 库来计算准确度的,简单的判断 2 个字符串的相似度. 没有用序列标注来训练和识别意图和槽值. 任务可以看 xiaohui/data/nlu.md 里面的内容.用它来和用户输入的语句做相似度对比,取最大的置信度. 这里的格式参考了 Rasa.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
## intent:greet

- 你好
- 您好
- 你叫什么名字

## intent:goodbye

- 再见
- 退出

## intent:get_time

- 现在几点
- 几点了

## intent:open_program

- 打开[记事本](program)
- 打开[google chrome](program)
- 打开[腾讯视频](program)
- 打开[百度网盘](program)

## intent:search

- 搜索[python](keyword)
- 搜索[新闻](keyword)
- 搜索[上海](keyword)

打开程序任务是查询系统的开始菜单里面的快捷方式, 要完整匹配才能打开.

1
2
3
4
5
6
7
8
class ActionOpen_program(Action):
def run(self):
programe_name = self.slots[0]["value"]
programe_exe = lnk.PROGRAMS.get(programe_name)
if programe_exe:
subprocess.call([programe_exe])
else:
utter_message(f"未找到程序{programe_name}")

搜索就是简单的打开浏览器搜索关键字

1
2
3
4
5
class ActionSearch(Action):
def run(self):
search = self.slots[0]["value"]
url = f"https://www.baidu.com/s?ie=UTF-8&wd={search}"
webbrowser.get().open(url)

语音识别和合成

试用了 CMU Sphinx 的离线, 效果太感人, 就看下 Google Speech Recognitione 的接口, 才发现不需要 申请api , 注意不是 Google Cloud Speech API.
利用 google 的语音识别和语音合成, 适配的是国内域名 cn , 所以不用担心用不了. 这里是利用的 hotfix 把代码给替换了, 把 com 替换成了 cn .

1
2
3
4
5
6
import speech_recognition as sr

from .utils import hotfix

sr.Recognizer.recognize_google = hotfix.recognize_google
r = sr.Recognizer()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def recognize_google(self, audio_data, key=None, language="en-US", show_all=False):
"""
Performs speech recognition on ``audio_data`` (an ``AudioData`` instance), using the Google Speech Recognition API.

The Google Speech Recognition API key is specified by ``key``. If not specified, it uses a generic key that works out of the box. This should generally be used for personal or testing purposes only, as it **may be revoked by Google at any time**.

To obtain your own API key, simply following the steps on the `API Keys <http://www.chromium.org/developers/how-tos/api-keys>`__ page at the Chromium Developers site. In the Google Developers Console, Google Speech Recognition is listed as "Speech API".

The recognition language is determined by ``language``, an RFC5646 language tag like ``"en-US"`` (US English) or ``"fr-FR"`` (International French), defaulting to US English. A list of supported language tags can be found in this `StackOverflow answer <http://stackoverflow.com/a/14302134>`__.

Returns the most likely transcription if ``show_all`` is false (the default). Otherwise, returns the raw API response as a JSON dictionary.

Raises a ``speech_recognition.UnknownValueError`` exception if the speech is unintelligible. Raises a ``speech_recognition.RequestError`` exception if the speech recognition operation failed, if the key isn't valid, or if there is no internet connection.
"""
assert isinstance(audio_data, AudioData), "``audio_data`` must be audio data"
assert key is None or isinstance(key, str), "``key`` must be ``None`` or a string"
assert isinstance(language, str), "``language`` must be a string"

flac_data = audio_data.get_flac_data(
convert_rate=None
if audio_data.sample_rate >= 8000
else 8000, # audio samples must be at least 8 kHz
convert_width=2, # audio samples must be 16-bit
)
if key is None:
key = "AIzaSyBOti4mM-6x9WDnZIjIeyEU21OpBXqWBgw"
url = "http://www.google.cn/speech-api/v2/recognize?{}".format(
urlencode({"client": "chromium", "lang": language, "key": key,})
)
request = Request(
url,
data=flac_data,
headers={
"Content-Type": "audio/x-flac; rate={}".format(audio_data.sample_rate)
},
)

# obtain audio transcription results
try:
response = urlopen(request, timeout=self.operation_timeout)
except HTTPError as e:
raise RequestError("recognition request failed: {}".format(e.reason))
except URLError as e:
raise RequestError("recognition connection failed: {}".format(e.reason))
response_text = response.read().decode("utf-8")

# ignore any blank blocks
actual_result = []
for line in response_text.split("\n"):
if not line:
continue
result = json.loads(line)["result"]
if len(result) != 0:
actual_result = result[0]
break

# return results
if show_all:
return actual_result
if (
not isinstance(actual_result, dict)
or len(actual_result.get("alternative", [])) == 0
):
raise UnknownValueError()

if "confidence" in actual_result["alternative"]:
# return alternative with highest confidence score
best_hypothesis = max(
actual_result["alternative"],
key=lambda alternative: alternative["confidence"],
)
else:
# when there is no confidence available, we arbitrarily choose the first hypothesis.
best_hypothesis = actual_result["alternative"][0]
if "transcript" not in best_hypothesis:
raise UnknownValueError()
return best_hypothesis["transcript"]

参考链接