Commit 96c2b7b7 authored by 周尚's avatar 周尚

modify lua pakcage.path

parent 447c8003
* 模块说明
| 文件名 | 说明 |
|--------------+----------------------------|
| ~base.lua~ | 基础支持 |
|--------------+----------------------------|
| ~async.lua~ | 唐库6.0 异步通知功能模块 |
|--------------+----------------------------|
| ~sync.lua~ | 唐库7.0 总控-节点 同步模块 |
| ~check.lua~ | 唐库7.0 通用字段校验 |
| ~settle.lua~ | 批处理业务 |
local _ASYNC = {}
package.path = package.path..";openresty/luabiz/?.lua;"
local mysql = require("resty.mysql")
local json = require( 'cjson')
local base = require ('base')
local function async_sql_ins()
local pay_type
local rate = 305
local reqs = json.decode(ngx.var.request_body)
if reqs["code"] ~= "SUCCESS" then
return nil
end
if string.find(reqs["trade_type"], "alipay") then
pay_type = "12"
elseif string.find(reqs["trade_type"], "weixin") then
pay_type = "11"
end
local ins_sql = string.format(
[[INSERT INTO pay_noty_trace SET host_map_id = 3, tkpay_order_no = %d, channel_order_no = %s, host_order_no = %s,
pay_type = %s, trans_amt = %s, trans_time = %s, trans_status = %s, currency = '%s', rate = %d]],
reqs['out_transaction_no'], reqs['out_trade_no'],
reqs['transaction_no'], pay_type, reqs['total_amount'] * 100,
reqs['time_start'], "0", reqs['currency'], rate)
base:log("ASYNC_SQL", ins_sql)
return ins_sql
end
local function async_db(ins_sql)
local db, err = mysql:new()
if not db then
base:log("ASYNC_DB", "failed to instantiate mysql: ", err)
return false
end
db:set_timeout(1000) -- 1 sec
local db_st = base:db_read()
local ok, err, errcode, sqlstate = db:connect{
host = db_st['host'],
port = db_st['port'],
database = db_st['database'],
user = db_st['user'],
password = db_st['password'],
timeout = db_st['timeout'],
}
if not ok then
base:log("ASYNC_DB", "failed to connect: ", err, ": ", errcode, " ", sqlstate)
return
end
local res, err, errcode, sqlstate = db:query(ins_sql)
if not res then
base:log("ASYNC_DB", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
return
end
db:set_keepalive(10000, 100)
end
local function async()
ngx.say("SUCCESS")
ngx.eof()
local ins_sql = async_sql_ins()
async_db(ins_sql)
end
async()
return _ASYNC
local _BASE = {}
local json = require ('cjson')
function _BASE.__FILE__ () return debug.getinfo(2,'S').source end
function _BASE.__LINE__ () return debug.getinfo(2,'l').currentline end
function _BASE.log(self, fname, ...)
local fpath = string.format("nginx/logs/%s_%s.log", tostring(fname), os.date("%Y%m%d"))
local f = io.open(fpath, "a+")
local str = ""
for i=1, select("#", ...) do
local tmp= tostring(select(i, ...))
str = string.format("%s %s", str, tmp)
end
f:write(os.date("<%Y-%m-%d %H:%M:%S> "))
-- f:write(...)
f:write(str)
f:write("\n")
f:close()
end
function _BASE.db_read(self, dbpath)
local f = io.open("nginx/conf/"..dbpath, "r")
local tmp = f:read("a*")
local db_st = json.decode(tmp)
f:close()
return db_st
end
function _BASE.close_db(self, db)
if not db then
return
end
local ok, err = db:close()
if not ok then
_BASE.log("failed to close:", err)
end
return
end
function _BASE.sql_concate(self, operate, tb_name, tb_data)
local str = ""
if string.lower(string.sub(operate, 0, 6)) == 'insert' then
str = "INSERT INTO "..tb_name.." SET "
elseif string.lower(string.sub(operate, 0, 6)) == 'update' then
str = "UPDATE "..tb_name.." SET "
end
for k, v in pairs(tb_data) do
if type(v) == "number" then
str = str..string.format("%s = %d, ", k, v)
elseif type(v) == "string" then
str = str..string.format("%s = '%s', ", k, v)
end
end
str = string.sub(str, 0, string.len(str)-2).." "
return str
end
function _BASE.tonumber(self, str)
if str == nil then
return 0
else
return tonumber(str)
end
end
return _BASE
local resty_md5 = require "resty.md5"
function is_table_empty(t)
return t == nil or next(t) == nil
end
local function table_concate(data)
local keyt = {}
local s = ""
local key_table = {}
--取出所有的键
for key,_ in pairs(data) do
table.insert(key_table,key)
end
--对所有键进行排序
table.sort(key_table)
for _,key in pairs(key_table) do
if key ~= "mesgSign" then
if type(data[key]) == "table" then
s = s..table_concate(data[key])
else
s = s..key.."="..tostring(data[key]).."&"
end
end
end
return s
end
local function check_mesg(data)
local ret = true
if data['mesgType'] == nil then
ngx.say('mesgType is missing')
ret = false
elseif #data['mesgType'] > 32 then
ngx.say('mesgType is too long')
ret = false
elseif type(data['mesgType']) ~= "string"
or ngx.re.match(data['mesgType'], "([^a-zA-Z]+)", 'jo')
then
ngx.say('mesgType has invalid format')
ret = false
end
if data['mesgTime'] == nil then
ngx.say('mesgTime is missing')
ret = false
elseif #data['mesgTime'] ~= 14 then
ngx.say('mesgTime has invalid length')
ret = false
elseif type(data["mesgTime"]) ~= "string"
or ngx.re.match(data["mesgTime"], "[^0-9]", 'jo') then
ngx.say('mesgTime has invalid format')
ret = false
end
if data['mesgChannel'] == nil then
ngx.say('mesgChannel is missing')
ret = false
elseif #data['mesgChannel'] > 8 then
ngx.say('mesgChannel is too long')
ret = false
elseif type(data['mesgChannel']) ~= "string"
-- or is_table_empty(ngx.re.match(data['mesgChannel'], [[(Web|App|Casher)]]), 'jo')
then
ngx.say('mesgChannel has invalid format')
ret = false
end
if data['mesgVersion'] == nil then
ngx.say('mesgVersion is missing')
ret = false
elseif #data['mesgVersion'] > 8 then
ngx.say('mesgVersion is too long')
ret = false
elseif type(data['mesgVersion']) ~= "string"
or ngx.re.match(data['mesgVersion'], "[^0-9Vv.]", 'jo')
then
ngx.say('mesgVersion has invalid format')
ret = false
end
if data['mesgLanguage'] == nil then
ngx.say('mesgLanguage is missing')
ret = false
elseif #data['mesgLanguage'] > 32 then
ngx.say('mesgLanguage is too long')
ret = false
elseif type(data['mesgLanguage']) ~= "string"
or ngx.re.match(data['mesgLanguage'], "[^a-zA-Z_.0-9-]")
then
ngx.say('mesgLanguage has invalid format')
ret = false
end
if data['mesgBody'] == nil then
ngx.say('mesgBody is missing')
ret = false
end
if data['mesgSign'] == nil then
ngx.say('mesgSign is missing')
ret = false
elseif #data['mesgSign'] ~= 32 then
ngx.say('mesgSign has invalid length')
ret = false
elseif type(data['mesgSign']) ~= "string"
or ngx.re.match(data['mesgSign'], "[^A-Z0-9]", 'jo')
then
ngx.say('mesgSign has invalid format')
ret = false
end
return ret
end
local json = require "cjson"
local data = json.decode(ngx.var.request_body)
local s = ""
if ngx.var.content_type == "application/json" then
if check_mesg(data) == false then
ngx.eof()
return;
end
s = table_concate(data)
s = string.sub(s, 1,#s-1)
else
ngx.say("invalid content_type")
ngx.eof()
end
ngx.say("md5 signed: ", s)
local md5 = resty_md5:new()
if not md5 then
ngx.say("failed to create md5 object")
return
end
local ok = md5:update(s)
if not ok then
ngx.say("failed to add data")
return
end
local digest = md5:final()
local str = require "resty.string"
local sign=string.upper(str.to_hex(digest))
if sign ~= data['mesgSign'] then
ngx.say("md5 result: false")
-- ngx.say("mesgSign expect: ", sign)
else
ngx.say("SUCCESS")
end
ngx.eof()
local _ASB = {}
package.path = package.path..";openresty/luabiz/?.lua;;"
local mysql = require("resty.mysql")
local base = require("base")
local json = require('cjson')
local db_conf = "tk7_dbs.json"
local tasks = {}
local function asb_sql(db_st, interval)
local log = ngx.log
local ERR = ngx.ERR
local db, err = mysql:new()
if not db then
base:log("S", "[ASB]:", "failed to instantiate mysql: ", err)
return false
end
db:SET_timeout(1000) -- 1 sec
local ok, err, errcode, sqlstate = db:connect{
host = db_st['host'],
port = db_st['port'],
database = db_st['database'],
user = db_st['user'],
password = db_st['password'],
timeout = db_st['timeout'],
}
if not ok then
base:log("S", "[NODE_ASB]:", "failed to connect: ", err, ": ", errcode, " ", sqlstate)
return
end
-- INSERT INTO history table
local sql_update = string.format([[INSERT INTO node_asbtail_his SELECT * FROM node_payment ]] ..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month) )]],
os.date("%Y%m%d"), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_update)
if not res then
base:log("S", "[NODE_ASB]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_ASB", sql_update)
return
end
-- DELETE current tables record
local sql_delete = string.format([[DELETE FROM node_asb ]]..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month))]],
os.date('%Y%m%d'), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_delete)
if not res then
base:log("S", "[NODE_ASB]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_ASB", sql_delete)
return
end
end
local function async(db_st, interval)
local co = coroutine.wrap(
function()
asb_sql(db_st, interval)
end
)
table.insert(tasks, co)
end
local function dispatch()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks, i)
else
i = i + 1
end
end
end
function _ASB.run()
local dbs_st = base:db_read(db_conf)
local interval = 3 -- the month interval
for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i], interval)
end
dispatch()
end
return _ASB
local _ASBDETAIL = {}
package.path = package.path..";openresty/luabiz/?.lua;;"
local mysql = require("resty.mysql")
local base = require('base')
local json = require('cjson')
local db_conf = "tk7_dbs.json"
local tasks = {}
local function asbdetail_sql(db_st, interval)
local log = ngx.log
local ERR = ngx.ERR
local db, err = mysql:new()
if not db then
base:log("S", "[ASBDETAIL]:", "failed to instantiate mysql: ", err)
return false
end
db:SET_timeout(1000) -- 1 sec
local ok, err, errcode, sqlstate = db:connect{
host = db_st['host'],
port = db_st['port'],
database = db_st['database'],
user = db_st['user'],
password = db_st['password'],
timeout = db_st['timeout'],
}
if not ok then
base:log("S", "[NODE_ASBDETAIL]:", "failed to connect: ", err, ": ", errcode, " ", sqlstate)
return
end
-- INSERT INTO history table
local sql_update = string.format([[INSERT INTO node_asbdetailtail_his SELECT * FROM node_payment ]] ..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month) )]],
os.date("%Y%m%d"), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_update)
if not res then
base:log("S", "[NODE_ASBDETAIL]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_ASBDETAIL", sql_update)
return
end
-- DELETE current tables record
local sql_delete = string.format([[DELETE FROM node_asbdetail ]]..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month))]],
os.date('%Y%m%d'), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_delete)
if not res then
base:log("S", "[NODE_ASBDETAIL]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_ASBDETAIL", sql_delete)
return
end
end
local function async(db_st, interval)
local co = coroutine.wrap(
function()
asbdetail_sql(db_st, interval)
end
)
table.insert(tasks, co)
end
local function dispatch()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks, i)
else
i = i + 1
end
end
end
function _ASBDETAIL.run()
local dbs_st = base:db_read(db_conf)
local interval = 3 -- the month interval
for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i], interval)
end
dispatch()
end
return _ASBDETAIL
local _NC = {}
package.path = package.path..";openresty/luabiz/?.lua;;"
local mysql = require("resty.mysql")
local base = require("base")
local json = require('cjson')
local db_conf = "tk7_dbs.json"
local tasks = {}
local function card_sql(db_st)
local log = ngx.log
local ERR = ngx.ERR
local db, err = mysql:new()
if not db then
base:log("S", "[CARD]:", "failed to instantiate mysql: ", err)
return false
end
db:SET_timeout(1000) -- 1 sec
local ok, err, errcode, sqlstate = db:connect{
host = db_st['host'],
port = db_st['port'],
database = db_st['database'],
user = db_st['user'],
password = db_st['password'],
timeout = db_st['timeout'],
}
if not ok then
base:log("S", "[NODE_CARD]:", "failed to connect: ", err, ": ", errcode, " ", sqlstate)
return
end
-- DELETE invalid rows
-- local sql_delete = [[DELETE FROM node_card WHERE (card_status != 0 AND card_status !=1) or now() > card_exp]]
local sql_update = [[UPDATE node_card SET card_status = 8 WHERE now() > card_exp]]
local res, err, errcode, sqlstate = db:query(sql_update)
if not res then
base:log("S", "[NODE_CARD]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_CARD", sql_update)
return
end
-- SET yesterday's balance
local sql_update = [[UPDATE node_card SET card_lbal_amt = card_cbal_amt, card_lbal_count = card_cbal_count]]
local res, err, errcode, sqlstate = db:query(sql_update)
if not res then
base:log("S", "[NODE_CARD]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_CARD", sql_update)
return
end
-- INSERT INTO history table
local sql_insert = string.format([[INSERT INTO node_card_his SELECT * FROM node_card WHERE DATE(`create_time`) < DATE('%s')]], os.date("%Y%m%d%H%M%S"))
local res, err, errcode, sqlstate = db:query(sql_insert)
if not res then
base:log("S", "[NODE_CARD]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_CARD", sql_insert)
return
end
-- DELETE current tables record
local sql_delete = string.format([[DELETE FROM node_card WHERE DATE(`create_time`) < DATE('%s')]], os.date("%Y%m%d%H%M%S"))
local res, err, errcode, sqlstate = db:query(sql_delete)
if not res then
base:log("S", "[NODE_CARD]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_CARD", sql_delete)
return
end
end
local function async(i)
local co = coroutine.wrap(
function()
card_sql(i)
end
)
table.insert(tasks, co)
end
local function dispatch()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks, i)
else
i = i + 1
end
end
end
function _NC.run()
local dbs_st = base:db_read(db_conf)
for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i])
end
dispatch()
end
return _NC
local _CR = {}
package.path = package.path..";openresty/luabiz/?.lua;;"
local mysql = require("resty.mysql")
local base = require("base")
local json = require('cjson')
local db_conf = "tk7_dbs.json"
local tasks = {}
local function chgrule_sql(db_st)
local log = ngx.log
local ERR = ngx.ERR
local db, err = mysql:new()
if not db then
base:log("S", "[CHGRULE]:", "failed to instantiate mysql: ", err)
return false
end
db:SET_timeout(1000) -- 1 sec
local ok, err, errcode, sqlstate = db:connect{
host = db_st['host'],
port = db_st['port'],
database = db_st['database'],
user = db_st['user'],
password = db_st['password'],
timeout = db_st['timeout'],
}
if not ok then
base:log("S", "[NODE_CHGRULE]:", "failed to connect: ", err, ": ", errcode, " ", sqlstate)
return
end
-- DELETE invalid rows
-- local sql_delete = [[DELETE FROM node_CHGRULE WHERE (card_status != 0 AND card_status !=1) or now() > card_exp]]
local sql_update = [[UPDATE node_chgrule SET rule_status = 1 WHERE now() > `DATE_end`]]
local res, err, errcode, sqlstate = db:query(sql_update)
if not res then
base:log("S", "[NODE_CHGRULE]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_CHGRULE", sql_update)
return
end
end
local function async(i)
local co = coroutine.wrap(
function()
chgrule_sql(i)
end
)
table.insert(tasks, co)
end
local function dispatch()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks, i)
else
i = i + 1
end
end
end
function _CR.run()
local dbs_st = base:db_read(db_conf)
for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i])
end
dispatch()
end
return _CR
local NODE_CS = {}
package.path = package.path..";openresty/luabiz/?.lua;;"
local mysql = require("resty.mysql")
local base = require('base')
local db_conf = "tk7_dbs.json"
local tasks = {}
local function cs_sql(db_st, interval)
base:log(MODULE, "-----------------------> START[node_cs]")
local db, err = mysql:new()
if not db then
base:log("S", "[CS]:", "failed to instantiate mysql: ", err)
return false
end
db:SET_timeout(1000) -- 1 sec
local ok, err, errcode, sqlstate = db:connect{
host = db_st['host'],
port = db_st['port'],
database = db_st['database'],
user = db_st['user'],
password = db_st['password'],
timeout = db_st['timeout'],
}
if not ok then
base:log("S", "[NODE_CS]:", "failed to connect: ", err, ": ", errcode, " ", sqlstate)
return
end
-- INSERT INTO history table
local sql_update = string.format([[INSERT INTO node_cs_his SELECT * FROM node_payment ]] ..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month) )]],
os.date("%Y%m%d"), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_update)
if not res then
base:log("S", "[NODE_CS]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_CS", sql_update)
return
end
-- DELETE current tables record
local sql_delete = string.format(
[[DELETE FROM node_cs ]]..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month))]],
os.date('%Y%m%d'), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_delete)
if not res then
base:log("S", "[NODE_CS]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_CS", sql_delete)
return
end
base:log(MODULE, "[NODE_SALEORDER]", "-----------------------> END[node_cs]")
end
local function async(db_st, interval)
local co = coroutine.wrap(
function()
cs_sql(db_st, interval)
end
)
table.insert(tasks, co)
end
local function dispatch()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks, i)
else
i = i + 1
end
end
end
function _CS.run()
local dbs_st = base:db_read(db_conf)
local interval = 3 -- the month interval
for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i], interval)
end
dispatch()
end
return NODE_CS
local _CSDETAIL = {}
package.path = package.path..";openresty/luabiz/?.lua;;"
local mysql = require("resty.mysql")
local base = require("base")
local json = require('cjson')
local db_conf = "tk7_dbs.json"
local tasks = {}
local function csdetail_sql(db_st, interval)
local log = ngx.log
local ERR = ngx.ERR
local db, err = mysql:new()
if not db then
base:log("S", "[CSDETAIL]:", "failed to instantiate mysql: ", err)
return false
end
db:SET_timeout(1000) -- 1 sec
local ok, err, errcode, sqlstate = db:connect{
host = db_st['host'],
port = db_st['port'],
database = db_st['database'],
user = db_st['user'],
password = db_st['password'],
timeout = db_st['timeout'],
}
if not ok then
base:log("S", "[NODE_CSDETAIL]:", "failed to connect: ", err, ": ", errcode, " ", sqlstate)
return
end
-- INSERT INTO history table
local sql_update = string.format([[INSERT INTO node_csdetailtail_his SELECT * FROM node_payment ]] ..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month) )]],
os.date("%Y%m%d"), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_update)
if not res then
base:log("S", "[NODE_CSDETAIL]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_CSDETAIL", sql_update)
return
end
-- DELETE current tables record
local sql_delete = string.format([[DELETE FROM node_csdetail ]]..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month))]],
os.date('%Y%m%d'), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_delete)
if not res then
base:log("S", "[NODE_CSDETAIL]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_CSDETAIL", sql_delete)
return
end
end
local function async(db_st, interval)
local co = coroutine.wrap(
function()
csdetail_sql(db_st, interval)
end
)
table.insert(tasks, co)
end
local function dispatch()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks, i)
else
i = i + 1
end
end
end
function _CSDETAIL.run()
local dbs_st = base:db_read(db_conf)
local interval = 3 -- the month interval
for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i], interval)
end
dispatch()
end
return _CSDETAIL
local _MS = {}
package.path = package.path..";openresty/luabiz/?.lua;;"
local mysql = require("resty.mysql")
local base = require("base")
local json = require('cjson')
local db_conf = "tk7_dbs.json"
local tasks = {}
local function ms_sql(db_st)
local log = ngx.log
local ERR = ngx.ERR
local db, err = mysql:new()
if not db then
base:log("S", "[MS]:", "failed to instantiate mysql: ", err)
return false
end
db:SET_timeout(1000) -- 1 sec
local ok, err, errcode, sqlstate = db:connect{
host = db_st['host'],
port = db_st['port'],
database = db_st['database'],
user = db_st['user'],
password = db_st['password'],
timeout = db_st['timeout'],
}
if not ok then
base:log("S", "[NODE_MS]:", "failed to connect: ", err, ": ", errcode, " ", sqlstate)
return
end
-- UPDATE invalid rows
-- local sql_delete = [[DELETE FROM node_MS WHERE (card_status != 0 AND card_status !=1) or now() > card_exp]]
local sql_update = [[UPDATE node_ms SET ms_status = 1 WHERE now() > `ms_date_end`]]
local res, err, errcode, sqlstate = db:query(sql_update)
if not res then
base:log("S", "[NODE_MS]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_MS", sql_update)
return
end
end
local function async(i)
local co = coroutine.wrap(
function()
ms_sql(i)
end
)
table.insert(tasks, co)
end
local function dispatch()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks, i)
else
i = i + 1
end
end
end
function _MS.run()
local dbs_st = base:db_read(db_conf)
for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i])
end
dispatch()
end
return _MS
local _PAYMENT = {}
package.path = package.path..";openresty/luabiz/?.lua;;"
local mysql = require("resty.mysql")
local base = require("base")
local json = require('cjson')
local db_conf = "tk7_dbs.json"
local tasks = {}
local function payment_sql(db_st, interval)
local log = ngx.log
local ERR = ngx.ERR
local db, err = mysql:new()
if not db then
base:log("S", "[PAYMENT]:", "failed to instantiate mysql: ", err)
return false
end
db:SET_timeout(1000) -- 1 sec
local ok, err, errcode, sqlstate = db:connect{
host = db_st['host'],
port = db_st['port'],
database = db_st['database'],
user = db_st['user'],
password = db_st['password'],
timeout = db_st['timeout'],
}
if not ok then
base:log("S", "[NODE_PAYMENT]:", "failed to connect: ", err, ": ", errcode, " ", sqlstate)
return
end
-- INSERT INTO history table
local sql_update = string.format([[INSERT INTO node_payment_his SELECT * FROM node_payment ]] ..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month) )]],
os.date("%Y%m%d"), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_update)
if not res then
base:log("S", "[NODE_PAYMENT]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_PAYMENT", sql_update)
return
end
-- DELETE current tables record
local sql_delete = string.format([[DELETE FROM node_payment ]]..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month))]],
os.date('%Y%m%d'), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_delete)
if not res then
base:log("S", "[NODE_PAYMENT]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_PAYMENT", sql_delete)
return
end
end
local function async(db_st, interval)
local co = coroutine.wrap(
function()
payment_sql(db_st, interval)
end
)
table.insert(tasks, co)
end
local function dispatch()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks, i)
else
i = i + 1
end
end
end
function _PAYMENT.run()
local dbs_st = base:db_read(db_conf)
local interval = 3 -- the month interval
for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i], interval)
end
dispatch()
end
return _PAYMENT
local _PSB = {}
package.path = package.path..";openresty/luabiz/?.lua;;"
local mysql = require("resty.mysql")
local base = require("base")
local tasks = {}
local db_conf = "tk7_dbs.json"
local function psb_sql(db_st, interval)
local log = ngx.log
local ERR = ngx.ERR
local db, err = mysql:new()
if not db then
base:log("S", "[PSB]:", "failed to instantiate mysql: ", err)
return false
end
db:SET_timeout(1000) -- 1 sec
local ok, err, errcode, sqlstate = db:connect{
host = db_st['host'],
port = db_st['port'],
database = db_st['database'],
user = db_st['user'],
password = db_st['password'],
timeout = db_st['timeout'],
}
if not ok then
base:log("S", "[NODE_PSB]:", "failed to connect: ", err, ": ", errcode, " ", sqlstate)
return
end
-- INSERT INTO history table
local sql_update = string.format([[INSERT INTO node_psb_his SELECT * FROM node_payment ]] ..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month) )]],
os.date("%Y%m%d"), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_update)
if not res then
base:log("S", "[NODE_PSB]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_PSB", sql_update)
return
end
-- DELETE current tables record
local sql_delete = string.format([[DELETE FROM node_psb ]]..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month))]],
os.date('%Y%m%d'), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_delete)
if not res then
base:log("S", "[NODE_PSB]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_PSB", sql_delete)
return
end
end
local function async(db_st, interval)
local co = coroutine.wrap(
function()
psb_sql(db_st, interval)
end
)
table.insert(tasks, co)
end
local function dispatch()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks, i)
else
i = i + 1
end
end
end
function _PSB.run()
local base = require('base')
local json = require('cjson')
local dbs_st = base:db_read(db_conf)
local interval = 3 -- the month interval
for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i], interval)
end
dispatch()
end
return _PSB
local _PSBDETAIL = {}
package.path = package.path..";openresty/luabiz/?.lua;;"
local mysql = require("resty.mysql")
local base = require("base")
local tasks = {}
local db_conf = "tk7_dbs.json"
local function psbdetail_sql(db_st, interval)
local log = ngx.log
local ERR = ngx.ERR
local db, err = mysql:new()
if not db then
base:log("S", "[PSBDETAIL]:", "failed to instantiate mysql: ", err)
return false
end
db:SET_timeout(1000) -- 1 sec
local ok, err, errcode, sqlstate = db:connect{
host = db_st['host'],
port = db_st['port'],
database = db_st['database'],
user = db_st['user'],
password = db_st['password'],
timeout = db_st['timeout'],
}
if not ok then
base:log("S", "[NODE_PSBDETAIL]:", "failed to connect: ", err, ": ", errcode, " ", sqlstate)
return
end
-- INSERT INTO history table
local sql_update = string.format([[INSERT INTO node_psbdetail_his SELECT * FROM node_payment ]] ..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month) )]],
os.date("%Y%m%d"), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_update)
if not res then
base:log("S", "[NODE_PSBDETAIL]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_PSBDETAIL", sql_update)
return
end
-- DELETE current tables record
local sql_delete = string.format([[DELETE FROM node_psbdetail ]]..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month))]],
os.date('%Y%m%d'), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_delete)
if not res then
base:log("S", "[NODE_PSBDETAIL]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_PSBDETAIL", sql_delete)
return
end
end
local function async(db_st, interval)
local co = coroutine.wrap(
function()
psbdetail_sql(db_st, interval)
end
)
table.insert(tasks, co)
end
local function dispatch()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks, i)
else
i = i + 1
end
end
end
function _PSBDETAIL.run()
local base = require('base')
local json = require('cjson')
local dbs_st = base:db_read(db_conf)
local interval = 3 -- the month interval
for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i], interval)
end
dispatch()
end
return _PSBDETAIL
local _RCHG = {}
package.path = package.path..";openresty/luabiz/?.lua;;"
local mysql = require("resty.mysql")
local base = require("base")
local json = require('cjson')
local db_conf = "tk7_dbs.json"
local tasks = {}
local function rchg_sql(db_st, interval)
local log = ngx.log
local ERR = ngx.ERR
local db, err = mysql:new()
if not db then
base:log("S", "[RCHG]:", "failed to instantiate mysql: ", err)
return false
end
db:SET_timeout(1000) -- 1 sec
local ok, err, errcode, sqlstate = db:connect{
host = db_st['host'],
port = db_st['port'],
database = db_st['database'],
user = db_st['user'],
password = db_st['password'],
timeout = db_st['timeout'],
}
if not ok then
base:log("S", "[NODE_RCHG]:", "failed to connect: ", err, ": ", errcode, " ", sqlstate)
return
end
-- INSERT INTO history table
local sql_update = string.format([[INSERT INTO node_rchg_his SELECT * FROM node_rchg ]] ..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month) )]],
os.date("%Y%m%d"), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_update)
if not res then
base:log("S", "[NODE_RCHG]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_RCHG", sql_update)
return
end
-- DELETE current tables record
local sql_delete = string.format([[DELETE FROM node_rchg ]]..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month))]],
os.date('%Y%m%d'), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_delete)
if not res then
base:log("S", "[NODE_RCHG]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_RCHG", sql_delete)
return
end
end
local function async(db_st, interval)
local co = coroutine.wrap(
function()
rchg_sql(db_st, interval)
end)
table.insert(tasks, co)
end
local function dispatch()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks, i)
else
i = i + 1
end
end
end
function _RCHG.run()
local dbs_st = base:db_read(db_conf)
local interval = 3
for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i], interval) -- interval should be months
end
dispatch()
end
return _RCHG
local _RCHGDETAIL = {}
package.path = package.path..";openresty/luabiz/?.lua;;"
local mysql = require("resty.mysql")
local base = require("base")
local json = require('cjson')
local db_conf = "tk7_dbs.json"
local tasks = {}
local function rchgdetail_sql(db_st, interval)
local log = ngx.log
local ERR = ngx.ERR
local db, err = mysql:new()
if not db then
base:log("S", "[RCHGDETAIL]:", "failed to instantiate mysql: ", err)
return false
end
db:SET_timeout(1000) -- 1 sec
local ok, err, errcode, sqlstate = db:connect{
host = db_st['host'],
port = db_st['port'],
database = db_st['database'],
user = db_st['user'],
password = db_st['password'],
timeout = db_st['timeout'],
}
if not ok then
base:log("S", "[NODE_RCHGDETAIL]:", "failed to connect: ", err, ": ", errcode, " ", sqlstate)
return
end
-- INSERT INTO history table
local sql_update = string.format([[INSERT INTO node_rchgdetail_his SELECT * FROM node_payment ]] ..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month) )]],
os.date("%Y%m%d"), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_update)
if not res then
base:log("S", "[NODE_RCHGDETAIL]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_RCHGDETAIL", sql_update)
return
end
-- DELETE current tables record
local sql_delete = string.format([[DELETE FROM node_rchgdetail ]]..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month))]],
os.date('%Y%m%d'), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_delete)
if not res then
base:log("S", "[NODE_RCHGDETAIL]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_RCHGDETAIL", sql_delete)
return
end
end
local function async(db_st, interval)
local co = coroutine.wrap(
function()
rchgdetail_sql(db_st, interval)
end
)
table.insert(tasks, co)
end
local function dispatch()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks, i)
else
i = i + 1
end
end
end
function _RCHGDETAIL.run()
local dbs_st = base:db_read(db_conf)
local interval = 3 -- the month interval
for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i], interval)
end
dispatch()
end
return _RCHGDETAIL
local _SALEDETAIL = {}
package.path = package.path..";openresty/luabiz/?.lua;;"
local mysql = require("resty.mysql")
local base = require("base")
local json = require('cjson')
local db_conf = "tk7_dbs.json"
local tasks = {}
local function saledetail_sql(db_st, interval)
local log = ngx.log
local ERR = ngx.ERR
local db, err = mysql:new()
if not db then
base:log("S", "[SALEDETAIL]:", "failed to instantiate mysql: ", err)
return false
end
db:SET_timeout(1000) -- 1 sec
local ok, err, errcode, sqlstate = db:connect{
host = db_st['host'],
port = db_st['port'],
database = db_st['database'],
user = db_st['user'],
password = db_st['password'],
timeout = db_st['timeout'],
}
if not ok then
base:log("S", "[NODE_SALEDETAIL]:", "failed to connect: ", err, ": ", errcode, " ", sqlstate)
return
end
-- INSERT INTO history table
local sql_update = string.format([[INSERT INTO node_saledetail_his SELECT * FROM node_payment ]] ..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month) )]],
os.date("%Y%m%d"), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_update)
if not res then
base:log("S", "[NODE_SALEDETAIL]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_SALEDETAIL", sql_update)
return
end
-- DELETE current tables record
local sql_delete = string.format([[DELETE FROM node_saledetail ]]..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month))]],
os.date('%Y%m%d'), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_delete)
if not res then
base:log("S", "[NODE_SALEDETAIL]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_SALEDETAIL", sql_delete)
return
end
end
local function async(db_st, interval)
local co = coroutine.wrap(
function()
saledetail_sql(db_st, interval)
end
)
table.insert(tasks, co)
end
local function dispatch()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks, i)
else
i = i + 1
end
end
end
function _SALEDETAIL.run()
local dbs_st = base:db_read(db_conf)
local interval = 3 -- the month interval
for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i], interval)
end
dispatch()
end
return _SALEDETAIL
local _SALEORDER = {}
package.path = package.path..";openresty/luabiz/?.lua;;"
local MODULE = "NODE"
local mysql = require("resty.mysql")
local base = require("base")
local db_conf = "tk7_dbs.json"
local tasks = {}
local function saleorder_sql(db_st, interval)
base:log(MODULE, "-----------------------> START[node_saleorder]")
local db, err = mysql:new()
if not db then
base:log(MODULE, "[SALEORDER]", "failed to instantiate mysql: ", err)
return false
end
db:SET_timeout(1000) -- 1 sec
local ok, err, errcode, sqlstate = db:connect{
host = db_st['host'],
port = db_st['port'],
database = db_st['database'],
user = db_st['user'],
password = db_st['password'],
timeout = db_st['timeout'],
}
if not ok then
base:log(MODULE, "[NODE_SALEORDER]", "failed to connect: ", err, ": ", errcode, " ", sqlstate)
return
end
-- INSERT INTO history table
local sql_update = string.format([[INSERT INTO node_saleorder_his SELECT * FROM node_payment ]] ..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month) )]],
os.date("%Y%m%d"), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_update)
if not res then
base:log(MODULE, "[NODE_SALEORDER]", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log(MODULE, "[NODE_SALEORDER]", sql_update)
return
end
-- DELETE current tables record
local sql_delete = string.format(
[[DELETE FROM node_saleorder ]]..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month))]],
os.date('%Y%m%d'), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_delete)
if not res then
base:log(MODULE, "[NODE_SALEORDER]", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log(MODULE, "[NODE_SALEORDER]", sql_delete)
return
end
base:log(MODULE, "[NODE_SALEORDER]", "-----------------------> END[node_saleorder]")
end
local function async(db_st, interval)
local co = coroutine.wrap(
function()
saleorder_sql(db_st, interval)
end
)
table.insert(tasks, co)
end
local function dispatch()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks, i)
else
i = i + 1
end
end
end
function _SALEORDER.run()
local dbs_st = base:db_read(db_conf)
local interval = 3 -- the month interval
for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i], interval)
end
dispatch()
end
return _SALEORDER
local _SHIFT = {}
package.path = package.path..";openresty/luabiz/?.lua;;"
local mysql = require("resty.mysql")
local base = require("base")
local json = require('cjson')
local db_conf = "tk7_dbs.json"
local tasks = {}
local function shift_sql(db_st, interval)
local log = ngx.log
local ERR = ngx.ERR
local db, err = mysql:new()
if not db then
base:log("S", "[SHIFT]:", "failed to instantiate mysql: ", err)
return false
end
db:SET_timeout(1000) -- 1 sec
local ok, err, errcode, sqlstate = db:connect{
host = db_st['host'],
port = db_st['port'],
database = db_st['database'],
user = db_st['user'],
password = db_st['password'],
timeout = db_st['timeout'],
}
if not ok then
base:log("S", "[NODE_SHIFT]:", "failed to connect: ", err, ": ", errcode, " ", sqlstate)
return
end
-- INSERT INTO history table
local sql_update = string.format([[INSERT INTO node_shift_his SELECT * FROM node_payment ]] ..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month) )]],
os.date("%Y%m%d"), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_update)
if not res then
base:log("S", "[NODE_SHIFT]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_SHIFT", sql_update)
return
end
-- DELETE current tables record
local sql_delete = string.format([[DELETE FROM node_shift ]]..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month))]],
os.date('%Y%m%d'), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_delete)
if not res then
base:log("S", "[NODE_SHIFT]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_SHIFT", sql_delete)
return
end
end
local function async(db_st, interval)
local co = coroutine.wrap(
function()
shift_sql(db_st, interval)
end
)
table.insert(tasks, co)
end
local function dispatch()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks, i)
else
i = i + 1
end
end
end
function _SHIFT.run()
local dbs_st = base:db_read(db_conf)
local interval = 3 -- the month interval
for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i], interval)
end
dispatch()
end
return _SHIFT
local _SKU = {}
package.path = package.path..";openresty/luabiz/?.lua;;"
local mysql = require("resty.mysql")
local base = require("base")
local json = require('cjson')
local db_conf = "tk7_dbs.json"
local tasks = {}
local function sku_sql(db_st)
local log = ngx.log
local ERR = ngx.ERR
local db, err = mysql:new()
if not db then
base:log("S", "[SKU]:", "failed to instantiate mysql: ", err)
return false
end
db:SET_timeout(1000) -- 1 sec
local ok, err, errcode, sqlstate = db:connect{
host = db_st['host'],
port = db_st['port'],
database = db_st['database'],
user = db_st['user'],
password = db_st['password'],
timeout = db_st['timeout'],
}
if not ok then
base:log("S", "[NODE_SKU]:", "failed to connect: ", err, ": ", errcode, " ", sqlstate)
return
end
-- DELETE invalid sku records
local sql_update = [[DELETE FROM node_sku WHERE sku_exp_flag = 9]]
local res, err, errcode, sqlstate = db:query(sql_update)
if not res then
base:log("S", "[NODE_SKU]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_SKU", sql_update)
return
end
end
local function async(i)
local co = coroutine.wrap(
function()
sku_sql(i)
end
)
table.insert(tasks, co)
end
local function dispatch()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks, i)
else
i = i + 1
end
end
end
function _SKU.run()
local dbs_st = base:db_read(db_conf)
for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i])
end
dispatch()
end
return _SKU
local _SMS = {}
package.path = package.path..";openresty/luabiz/?.lua;;"
local mysql = require("resty.mysql")
local base = require("base")
local json = require('cjson')
local db_conf = "tk7_dbs.json"
local tasks = {}
local function sms_sql(db_st, interval)
local log = ngx.log
local ERR = ngx.ERR
local db, err = mysql:new()
if not db then
base:log("S", "[SMS]:", "failed to instantiate mysql: ", err)
return false
end
db:SET_timeout(1000) -- 1 sec
local ok, err, errcode, sqlstate = db:connect{
host = db_st['host'],
port = db_st['port'],
database = db_st['database'],
user = db_st['user'],
password = db_st['password'],
timeout = db_st['timeout'],
}
if not ok then
base:log("S", "[NODE_SMS]:", "failed to connect: ", err, ": ", errcode, " ", sqlstate)
return
end
-- INSERT INTO history table
local sql_update = string.format([[INSERT INTO node_sms_his SELECT * FROM node_payment ]] ..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month) )]],
os.date("%Y%m%d"), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_update)
if not res then
base:log("S", "[NODE_SMS]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_SMS", sql_update)
return
end
-- DELETE current tables record
local sql_delete = string.format([[DELETE FROM node_sms ]]..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month))]],
os.date('%Y%m%d'), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_delete)
if not res then
base:log("S", "[NODE_SMS]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_SMS", sql_delete)
return
end
end
local function async(db_st, interval)
local co = coroutine.wrap(
function()
sms_sql(db_st, interval)
end
)
table.insert(tasks, co)
end
local function dispatch()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks, i)
else
i = i + 1
end
end
end
function _SMS.run()
local dbs_st = base:db_read(db_conf)
local interval = 3 -- the month interval
for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i], interval)
end
dispatch()
end
return _SMS
local _SMSVIP = {}
package.path = package.path..";openresty/luabiz/?.lua;;"
local mysql = require("resty.mysql")
local base = require("base")
local json = require('cjson')
local db_conf = "tk7_dbs.json"
local tasks = {}
local function smsvip_sql(db_st, interval)
local log = ngx.log
local ERR = ngx.ERR
local db, err = mysql:new()
if not db then
base:log("S", "[SMSVIP]:", "failed to instantiate mysql: ", err)
return false
end
db:SET_timeout(1000) -- 1 sec
local ok, err, errcode, sqlstate = db:connect{
host = db_st['host'],
port = db_st['port'],
database = db_st['database'],
user = db_st['user'],
password = db_st['password'],
timeout = db_st['timeout'],
}
if not ok then
base:log("S", "[NODE_SMSVIP]:", "failed to connect: ", err, ": ", errcode, " ", sqlstate)
return
end
-- INSERT INTO history table
local sql_update = string.format([[INSERT INTO node_smsvip_his SELECT * FROM node_payment ]] ..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month) )]],
os.date("%Y%m%d"), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_update)
if not res then
base:log("S", "[NODE_SMSVIP]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_SMSVIP", sql_update)
return
end
-- DELETE current tables record
local sql_delete = string.format([[DELETE FROM node_smsvip ]]..
[[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) < EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', interval %s month))]],
os.date('%Y%m%d'), tostring(interval))
local res, err, errcode, sqlstate = db:query(sql_delete)
if not res then
base:log("S", "[NODE_SMSVIP]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_SMSVIP", sql_delete)
return
end
end
local function async(db_st, interval)
local co = coroutine.wrap(
function()
smsvip_sql(db_st, interval)
end
)
table.insert(tasks, co)
end
local function dispatch()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks, i)
else
i = i + 1
end
end
end
function _SMSVIP.run()
local dbs_st = base:db_read(db_conf)
local interval = 3 -- the month interval
for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i], interval)
end
dispatch()
end
return _SMSVIP
local _NT = {}
package.path = package.path..";openresty/luabiz/?.lua;;"
local mysql = require("resty.mysql")
local base = require("base")
local tasks = {}
local db_conf = "tk7_dbs.json"
local function ticket_sql(db_st)
local log = ngx.log
local ERR = ngx.ERR
local db, err = mysql:new()
if not db then
base:log("S", "[TICKET]:", "failed to instantiate mysql: ", err)
return false
end
db:SET_timeout(1000) -- 1 sec
local ok, err, errcode, sqlstate = db:connect{
host = db_st['host'],
port = db_st['port'],
database = db_st['database'],
user = db_st['user'],
password = db_st['password'],
timeout = db_st['timeout'],
}
if not ok then
base:log("S", "[NODE_TICKET]:", "failed to connect: ", err, ": ", errcode, " ", sqlstate)
return
end
-- DELETE invalid rows
-- local sql_delete = [[DELETE FROM node_ticket WHERE (ticket_status != 0 AND ticket_status !=1) or now() > ticket_exp]]
local sql_update = [[UPDATE node_ticket SET ticket_status = 8 WHERE now() > ticket_exp]]
local res, err, errcode, sqlstate = db:query(sql_update)
if not res then
base:log("S", "[NODE_TICKET]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_TICKET", sql_update)
return
end
-- INSERT INTO history table
local sql_insert = string.format([[INSERT INTO node_ticket_his SELECT * FROM node_ticket WHERE DATE(`create_time`) < DATE('%s')]], os.date("%Y%m%d%H%M%S"))
local res, err, errcode, sqlstate = db:query(sql_insert)
if not res then
base:log("S", "[NODE_TICKET]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_TICKET", sql_insert)
return
end
-- DELETE current tables record
local sql_delete = string.format([[DELETE FROM node_ticket WHERE DATE(`create_time`) < DATE('%s')]], os.date("%Y%m%d%H%M%S"))
local res, err, errcode, sqlstate = db:query(sql_delete)
if not res then
base:log("S", "[NODE_TICKET]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_TICKET", sql_delete)
return
end
end
local function async(i)
local co = coroutine.wrap(
function()
ticket_sql(i)
end
)
table.insert(tasks, co)
end
local function dispatch()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks, i)
else
i = i + 1
end
end
end
function _NT.run()
local base = require('base')
local json = require('cjson')
local dbs_st = base:db_read(db_conf)
for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i])
end
dispatch()
end
return _NT
local _VIPACCT = {}
package.path = package.path..";openresty/luabiz/?.lua;;"
local mysql = require("resty.mysql")
local base = require("base")
local tasks = {}
local db_conf = "tk7_dbs.json"
local function vipacct_sql(db_st)
local log = ngx.log
local ERR = ngx.ERR
local db, err = mysql:new()
if not db then
base:log("S", "[VIPACCT]:", "failed to instantiate mysql: ", err)
return false
end
db:SET_timeout(1000) -- 1 sec
local ok, err, errcode, sqlstate = db:connect{
host = db_st['host'],
port = db_st['port'],
database = db_st['database'],
user = db_st['user'],
password = db_st['password'],
timeout = db_st['timeout'],
}
if not ok then
base:log("S", "[NODE_VIPACCT]:", "failed to connect: ", err, ": ", errcode, " ", sqlstate)
return
end
-- SET yesterday's balance
local sql_update = [[UPDATE node_vipacct SET `point_lbal` = `point_cbal`, `acct_lbal` = `acct_cbal`, ]]..
[[`acct_lbal` = `acct_cbal`, `count_lbal` = `count_cbal`]]
local res, err, errcode, sqlstate = db:query(sql_update)
if not res then
base:log("S", "[NODE_VIPACCT]:", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
base:log("NODE_VIPACCT", sql_update)
return
end
-- SET VIPACCT's level
end
local function async(i)
local co = coroutine.wrap(
function()
vipacct_sql(i)
end
)
table.insert(tasks, co)
end
local function dispatch()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks, i)
else
i = i + 1
end
end
end
function _VIPACCT.run()
local base = require('base')
local json = require('cjson')
local dbs_st = base:db_read(db_conf)
for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i])
end
dispatch()
end
return _VIPACCT
package.path = package.path..";openresty/luabiz/?.lua;;"
package.path = package.path..";openresty/luabiz/settle/?.lua;;"
local base = require('base')
-- Settle modules
local node_asb = require('node_asb')
local node_asbdetail = require('node_asbdetail')
local node_card = require('node_card')
local node_chgrule = require('node_chgrule')
local node_cs = require('node_cs')
local node_csdetail = require('node_csdetail')
local node_ms = require('node_ms')
local node_payment = require('node_payment')
local node_psb = require('node_psb')
local node_psbdetail = require('node_psbdetail')
local node_rchg = require('node_rchg')
local node_rchgdetail = require('node_rchgdetail')
local node_saledetail = require('node_saledetail')
local node_saleorder = require('node_saleorder')
local node_shift = require('node_shift')
local node_sku = require('node_sku')
local node_sms = require('node_sms')
local node_smsvip = require('node_smsvip')
local node_ticket = require('node_ticket')
local node_vipacct = require('node_vipacct')
local tk_sdt = require('tk_sdt')
local delay = 5 -- delaytime
local interval_time = '1500' -- time to run, here is 12:00 for each day
local handler
handler = function(premature, interval_time)
if not premature then
local nowtime = os.date("%H%M")
if nowtime == interval_time then
-- settle features start
-- node_asb: run()
-- node_asbdetail: run()
-- node_card: run()
-- node_chgrule: run()
-- node_cs: run()
-- node_csdetail: run()
-- node_ms: run()
-- node_payment: run()
-- node_psb: run()
-- node_psbdetail: run()
-- node_rchg: run()
-- node_rchgdetail: run()
-- node_saledetail: run()
-- node_saleorder: run()
-- node_shift: run()
-- node_sku: run()
-- node_sms: run()
-- node_smsvip: run()
-- node_ticket: run()
-- node_vipacct: run()
tk_sdt: run()
-- reset interval_time
interval_time = tostring(tonumber(interval_time) - 1)
end
local ok, err = ngx.timer.at(delay, handler, interval_time)
if not ok then
ngx.log(ngx.ERR, "failed to create the timer:", err)
return
end
end
end
if ngx.worker.id() == 1 then
local ok, err = ngx.timer.at(delay, handler, interval_time)
if not ok then
ngx.log(ngx.ERR, "failed to create the timer:", err)
return
end
end
This diff is collapsed.
local _SYNC = {}
local mysql = require("resty.mysql")
local json = require 'cjson'
local dbs = {
host = "127.0.0.1" ,
port = 3306,
db ="zzd_tk",
user="root",
password = "",
timeout = 30
}
local function close_db(db)
if not db then
return
end
db:close()
end
-- errno meaning
-- 0x00 SUCCESS
-- 0x01
-- 0x02
-- 0x03
-- 0x04
-- ....
-- 0xFF
local errno = 0
local function base_sql_con(req)
local ok = true
local err = 0
local msg = "SUCCESS"
local str = ""
for k, v in pairs(req.configs) do
if type(v) == "number" then
str = str..string.format("%s=%d,", k, v)
elseif type(v) == "string" then
str = str..string.format("%s='%s',", k, v)
end
end
str = string.sub(str, 0, string.len(str)-1).." "
return ok, err, msg, str
end
local function sync_tk_lang(req)
local str = string.format("INSERT INTO TK_LANG(LANG_CODE, LANG_NAME) VALUES('%s', '%s')",
req['LANG_CODE'], req['LANG_NAME'])
return true, str
end
local function sync_node_mcfg(req)
local ok = true
local err = 0
local msg = "SUCCESS"
local str = ""
local tmp
str = string.format("UPDATE NODE_MCFG SET ")
ok, err, msg, tmp = base_sql_con(req)
str = str..tmp
if type(req.configs['MER_MAP_ID']) == "number" then
str = str..string.format("WHERE MER_MAP_ID = %s", req.configs['MER_MAP_ID'])
elseif type(req.configs['MER_MAP_ID']) == "string" then
str = str..string.format("WHERE MER_MAP_ID = '%s'", req.configs['MER_MAP_ID'])
end
return ok, err, msg, str
end
local function sync_bizz_switch(req)
local ok = true
local err = 0
local msg = "SUCCESS"
local str = ""
if req['method'] == 'TK_LANG' then
ok, err, msg, str = sync_tk_lang(req)
elseif req['method'] == 'NODE_MCFG' then
ok, err, msg, str = sync_node_mcfg(req)
end
ngx.say(str)
ngx.eof()
return ok, err, msg, str
end
local function sync_err_tell(resp, err, msg)
resp["code"] = err
if errno == 0x00 then
resp["message"] = msg
end
table.sort(resp)
ngx.say(json.encode(resp))
ngx.eof()
return 0
end
local function do_sql(str)
local ok = true
local err = 0
local msg = "SUCCESS"
local db, err = mysql:new ()
if not db then
ngx.say("new mysql error: ", err)
ngx.eof()
return false, err, "FAIL"
end
db:set_timeout(1000)
local props = {
host = db['host'],
port = db['port'],
database = db['database'],
user = db['user'],
password = db['password']
}
local ok, msg, errno, sqlstate = db:connect(props)
if not ok then
-- msg = string.format("connect to mysql error: ", err, " , errno : ", errno, " , sqlstate : ", sqlstate)
return ok, err, msg
end
ok, msg, errno, sqlstate = db:query(str)
if not ok then
ngx.sqy(str)
-- msg = string.format("insert error : ", err, " , errno : ", errno, " , sqlstate : ", sqlstate)
close_db(db)
return ok, err, msg
end
close_db(db)
return ok, err, msg
end
local function main()
local ok = true
local err = 0
local msg = "SUCCESS"
local str = ""
local resp = {}
local req = json.decode(ngx.var.request_body)
ok, err, msg, str = sync_bizz_switch(req)
if not ok then
return sync_err_tell(resp, err, msg)
end
ok, err, msg = do_sql(str)
return sync_err_tell(resp, err, msg)
end
main()
return _SYNC
user root;
user zhoush;
# user nobody;
worker_processes auto;
......@@ -31,7 +31,7 @@ http {
#keepalive_timeout 0;
keepalive_timeout 65;
init_worker_by_lua_file "/www/openresty/luabiz/settle/settle.lua";
init_worker_by_lua_file "luabiz/settle/settle.lua";
#gzip on;
server {
......@@ -111,8 +111,8 @@ http {
location /test {
content_by_lua_block {
package.path = package.path..";openresty/luabiz/?.lua;;"
package.path = package.path..";openresty/luabiz/settle/?.lua;;"
package.path = package.path..";luabiz/?.lua;;"
package.path = package.path..";luabiz/settle/?.lua;;"
tk_sdt = require ("tk_sdt")
tk_sdt:run()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment