数据同步中不可避免的就是一大串的sql语句,常用的group by、 order by asc desc、limit、bettwen and、like ‘%x’、in、not in 、is null、is not null 、left join on、join on、right join on已经不够用了。因此记录一下,稍微复杂一点的sql语句。
1. join匹配多条只选一条
select
*
from
d
left join (
select
*,
row_number() over (partition by a1
order by
field(a2,'3','2')) as rn FROM c) t2 on d.a1=t2.a1
WHERE
t2.rn = 1;
2.join 一对多会发生什么
不管使用哪种连接方式,只要存在1对多,那么得到的查询结果都会比主表记录数多
3. field 自定义排序
SELECT *
FROM products
ORDER BY FIELD(category, 'Clothing', 'Home', 'Electronics');
4.case when end as条件语句
SELECT
a1,
c1,
c2,
c3,
CASE
when c1 is null and c2 is null then -c3
WHEN c1 < c2 and c1 is not null or c2 is null THEN c1 - c3
WHEN c1 >= c2 and c2 is not null or c1 is null THEN c2 - c3
END AS result
FROM B;
5.获取当月首日和末日0点
select
STR_TO_DATE(DATE_FORMAT(last_day(current_date()), '%Y-%m-01 00:00:00'),
'%Y-%m-%d %H:%i:%s') as first_day_of_month;
select
STR_TO_DATE(DATE_FORMAT(last_day(current_date()), '%Y-%m-%d 00:00:00'),
'%Y-%m-%d %H:%i:%s') as last_day_of_month;
-- 获取前三个月首日
SELECT DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 3 MONTH), '%Y-%m-01') AS first_day_of_three_months_ago;
6.having获取重复数据并计数
SELECT department, COUNT(*) AS employee_count
FROM employees
GROUP BY department
HAVING COUNT(*) > 1;
7.ifnull的高效之处
判断是否在有效期内,而失效日期可能为null时。
使用BETWEEN cr.EFF_DT AND IFNULL(cr.EXP_DT, STR_TO_DATE('20991231', '%Y%m%d')):
比BETWEEN cr.EFF_DT AND cr.EXP_DT OR cr.EXP_DT IS NULL:高效
BETWEEN cr.EFF_DT AND IFNULL(cr.EXP_DT, STR_TO_DATE('20991231', '%Y%m%d')):
这个条件首先检查 cr.EFF_DT 和 cr.EXP_DT 之间的范围。如果 cr.EXP_DT 是 NULL,则使用一个固定的日期(2099-12-31)作为替代。这使得查询可以在一次比较中处理有效日期和 NULL 的情况。
因为这个条件总是会产生一个有效的范围(即使是用2099年作为替代),数据库引擎可以更有效地使用索引来查找符合条件的记录。
BETWEEN cr.EFF_DT AND cr.EXP_DT OR cr.EXP_DT IS NULL:
这个条件则需要分开处理两种情况:一种是 cr.EXP_DT 不为 NULL 的情况,另一种是 cr.EXP_DT 为 NULL 的情况。
这种写法需要数据库引擎在执行时进行更多的逻辑判断,这可能导致更复杂的查询计划和较低的执行效率,特别是在数据量大时。
对于每一行,数据库都必须评估 OR 条件,因此可能会导致更多的行被扫描,尤其是当 EXP_DT 的 NULL 值占比较高时。
总结来说,使用 IFNULL 将 NULL 值替换为一个固定日期的策略可以简化查询逻辑,使得数据库引擎在执行时能够更高效地利用索引和优化查询计划,从而提高执行效率。
sum(case when cp.PASSPORT_TYPE = 'OTHR' then 1 else 0 end) as OTHR_CNT
12.按月汇总
-- mysql
SELECT
str_to_date(DATE_FORMAT(duty_date, '%Y%m01'),'%Y%m%d') AS month,
employee_id,
SUM(work_hours) AS total_work_hours
FROM
DutySchedule
WHERE
duty_date>=DATE(CONCAT(YEAR(CURDATE()), '-01-01'))
GROUP BY
month, employee_id;
-- oracle
SELECT
TRUNC(duty_date, 'MM') AS month,
employee_id,
SUM(work_hours) AS total_work_hours
FROM
DutySchedule
WHERE
EXTRACT(YEAR FROM duty_date) = EXTRACT(YEAR FROM SYSDATE)
GROUP BY
TRUNC(duty_date, 'MM'), employee_id;
13.公共表表达式 (CTE)
公共表表达式 (CTE): 使用 WITH 语句将三个不同的查询合并为一个名为 MonthlyWorkHours 的 CTE,这样可以减少重复代码。
WITH MonthlyWorkHours AS (
SELECT
idx,
e.employee_id,
e.department,
e.position,
DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL idx MONTH), '%Y-%m') AS month,
SUM(ws.work_hours) AS total_work_hours,
hq.limit_duration AS hq_limit_duration,
pd.limit_duration AS pd_limit_duration
FROM
(SELECT employee_id, department, position FROM employees) e
LEFT JOIN
DutySchedule ws ON e.employee_id = ws.employee_id
LEFT JOIN
HQLimitDuration hq ON e.department = hq.department AND e.position = hq.position
LEFT JOIN
PersonalLimitDuration pd ON e.employee_id = pd.employee_id
AND CURRENT_DATE BETWEEN pd.start_date AND pd.end_date
CROSS JOIN (SELECT 0 AS idx UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3) AS idx -- 生成 0-3 的索引
WHERE
(idx = 0 AND ws.duty_date >= DATE_FORMAT(CURDATE(), '%Y-01-01'))
OR (idx = 1 AND ws.duty_date >= DATE_FORMAT(CURDATE(), '%Y-%m-01'))
OR (idx = 2 AND ws.duty_date >= DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 2 MONTH), '%Y-%m-01'))
OR (idx = 3 AND ws.duty_date >= DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 3 MONTH), '%Y-%m-01'))
GROUP BY idx,
e.employee_id,
e.department,
e.position, hq.limit_duration,
pd.limit_duration,month
)
SELECT
g1.month,
g1.employee_id,
CASE
WHEN g1.remaining_work_hours is NOT NULL or g2.remaining_work_hours IS NOT NULL or g3.remaining_work_hours IS NOT NULL THEN
LEAST(IFNULL(g1.remaining_work_hours, 99999999), IFNULL(g2.remaining_work_hours, 99999999), IFNULL(g3.remaining_work_hours, 99999999))
ELSE
NULL
END AS remaining_work_hours
FROM
(SELECT
month,
employee_id,
LEAST(IFNULL(hq_limit_duration, 0), IFNULL(pd_limit_duration, 0)) - IFNULL(total_work_hours, 0) AS remaining_work_hours
FROM
MonthlyWorkHours
WHERE idx = 0) g1
left JOIN
(SELECT
month,
employee_id,
LEAST(IFNULL(hq_limit_duration, 0), IFNULL(pd_limit_duration, 0)) - IFNULL(total_work_hours, 0) AS remaining_work_hours
FROM
MonthlyWorkHours
WHERE idx = 1) g2 ON g1.employee_id = g2.employee_id AND g1.month = g2.month
left JOIN
(SELECT
month,
employee_id,
LEAST(IFNULL(hq_limit_duration, 0), IFNULL(pd_limit_duration, 0)) - IFNULL(total_work_hours, 0) AS remaining_work_hours
FROM
MonthlyWorkHours
WHERE idx = 3) g3 ON g1.employee_id = g3.employee_id AND g1.month = g3.month
ORDER BY
g1.month;
import pytest
from data import Data
from checkdata import CheckData
class TestCase:
@pytest.mark.parametrize('data',CheckData(Data.order_data_model, Data.order_data_define).get_null_data_list())
def test_case(self,data):
# res = create_order(data)
# assert res.text['success'] == False
assert data is None
if __name__ == '__main__':
pytest.main()
#输入-输出
a = input('输出dotcpp的网站:')
b = float(input('我认为适宜的温度:'))
index = list(map(int,input().split()))
#这种方式可以输入任意个int型的数字,在这里采用列表来存储。输入时空格分开
print(index)
print(index[0])
#数值类型
int float bool
c = 9
d =9.9
e = True
数值运算符
+ - * / //取整 %取余
比较运算符
> < >= <= == != is is not
转义符 \
print("children\'s class") #children's class
制表符 \t
[:0] [0:1] [0:5:2]
#索引-找单个[:3]-第4个-d
#结果=数据[索引值]
s1 = ‘abcde’
res = s1[2] #正向c-从0开始
res = s1[-1] #反向e-从-1开始
循环打印单个字符串
s1 = 'abcde'
for i in range(len(s1)):
print(s1[i])
i = i+1
#切片-找多个[1:3]-取头不取尾-取第2、3位-bc
#结果=数据[start:end]
#切片2-跨步取值-[0:5:2] -ab cd e -跨两步,得ace
列表:[]
元组:() 不可修改
八、替换-replace
s ='123rrr123ttt123'
res=s.replace('123','999',2)
#参数1被参数2替换,替换参数3个,无参数3时全替换
#只能替换字符串型,不能替换数值型
九、查找-find-返回第一个位置索引
s ='123rrr123ttt123'
res=s.find('23')
res=s.find('23',4,10) #从第五开始找
set={11,22,33}
set = set() #定义空集合
集合内没有重复元素
去除列表内重复元素:list-set-list\
li = [1,22,22,33]
a = list(set(li))
十七、随机数
import random
res = random.randint(1,999) 随机整数
res = random.random() 随机小数,0-1之间
res = random.uniform(10,20) 10-20之间的小数
li = [1,33,55]
res = random.choice(li) 在列表中挑选一个随机值
十八、条件判断
if elif else
逻辑运算:and or not
成员运算:in ,not in
身份运算:is ,is not 判断内存id
赋值运算:= ,+= ,-= ,*= ,/= ,%=
十九、循环
while True:break continue
for i in [1,2,3]/dic/dic.value/dic.items():
元组拆分-遍历键和值:
for k,v in dic.items():
列表循环
li2 = ['www{}'.format(i) for i in range(10)]
li = [ i+1 for i in (11,22,33)]
二十、range内置函数
生成指定序列
li = list(range(100))
li = list(range(9,99,2))
# excel的使用
# Excel使用xlsx格式,解析成三个对象,工作簿workbook、表单Sheet、单元格Cell
import openpyxl
workbook = openpyxl.load_workbook('test1.xlsx')
print(workbook.sheetnames)
sh = workbook['Sheet1']
print(sh.cell(row=3,column=3).value)
#按行读取 rows 按列读取 column
res = list(sh.rows)
for i in res:
print(i,i.value)
# 转为字典格式
res = list(sh.rows)
title = [i.value for i in res[0]]
cases = []
for item in res[1:]:
data = [i.value for i in item]
dic = dict(zip(title,data))
print(dic)
cases.append(dic)
@ddt
class TestMusen(unittset.TestCase)
@list_data(cases)
def test(self, item):
expected = eval(item['espected'])
params = eval(item['data'])
res = func(**params)
self.assertEqual(expectet,res)
# 结果
{'case':'1','data':'2','expected:'3'}
{'case':'1','data':'2','expected:'3'}
# 封装Excel
import openpyxl
class HandleExcel:
def __init__(self,filename,sheetname):
self.filename = filename
self.sheetname = sheetname
self.workbook = openpyxl.load_workbook(filename)
self.sheet = self.workbook[sheetname]
def read_data(self):
res = list(self.sheet.rows)
title = [i.value for i in res[0]]
cases = []
for item in res[1:]:
data = [i.value for i in item]
dic = dict(zip(title, data))
cases.append(dic)
return cases
def write_data(self,row_no:int,colunm_no:int,value):
self.sheet.cell(row=row_no,column=colunm_no,value=value)
self.workbook.save(filename=self.filename)
@ddt
class TestMusen(unittest.TestCase):
excel = HandleExcel('仓库作业-入库.xlsx', 'Sheet1')
cases = excel.read_data()
@list_data(cases)
def test(self, item):
expected = eval(item['expected'])
params = eval(item['params'])
res = func(**params)
self.assertEqual(True, res)
readyaml.py
import yaml
with open('yaml.yaml','r',encoding='utf-8') as f:
res =yaml.load(f,Loader=yaml.Loader)
print(res,type(res))
print(res['case']['data'])