Commit 96335ba7 authored by 周尚's avatar 周尚

TK_SDT表汇总后同步到总控机

parent acf66189
...@@ -44,3 +44,4 @@ ...@@ -44,3 +44,4 @@
2. [X] 数据同步 2. [X] 数据同步
1. [X] 门店信息 1. [X] 门店信息
2. [X] 款台信息 2. [X] 款台信息
3. [X] 门店汇总
...@@ -74,8 +74,8 @@ http { ...@@ -74,8 +74,8 @@ http {
location /test { location /test {
content_by_lua_block { content_by_lua_block {
local node_vip = loadmod("node_vipacct") local tk_sdt = loadmod("tk_sdt")
node_vip:run() tk_sdt:run()
} }
} }
......
local _SDT = {} local TK_SDT = {}
local MODULE="S" -- Settle local MODULE="S" -- Settle
...@@ -14,33 +14,60 @@ local tasks = {} ...@@ -14,33 +14,60 @@ local tasks = {}
-- 门店统计 -- 门店统计
-- @param table db_st 数据库对象 -- @param table tk_biz 节点机数据库对象
-- table tk_control 总控机数据库对象
-- table tk_database 数据同步数据库名
-- @return nil -- @return nil
local function sdt_sql(db_st) local function sdt_sql(tk_biz, tk_control)
zx_base:log(MODULE, "-----------------------> START[TK_SDT]") zx_base:log(MODULE, "-----------------------> START[TK_SDT]")
-- 新建mysql连接 -- 新建mysql连接
local db, err = mysql:new() local db_biz, err = mysql:new()
if not db then if not db_biz then
zx_base:log(MODULE, "[TK_SDT]", "failed to instantiate mysql: ", err) zx_base:log(MODULE, "[TK_SDT]", "failed to instantiate mysql: ", err)
return false return false
end end
-- 超时设置 -- 超时设置
db:set_timeout(1000) -- 1 sec db_biz:set_timeout(1000) -- 1 sec
-- 连接mysql -- 连接tk_biz
local ok, err, errcode, sqlstate = db:connect{ local ok, err, errcode, sqlstate = db_biz:connect{
host = db_st['host'], host = tk_biz['host'],
port = db_st['port'], port = tk_biz['port'],
database = db_st['database'], database = tk_biz['database'],
user = db_st['user'], user = tk_biz['user'],
password = db_st['password'], password = tk_biz['password'],
timeout = db_st['timeout'], timeout = tk_biz['timeout'],
charset = 'utf8' charset = 'utf8'
} }
if not ok then if not ok then
zx_base:log(MODULE, "[TK_SDT]", "failed to connect:", json.encode(db_st), ":", errcode, sqlstate) zx_base:log(MODULE, "[TK_SDT]", "failed to connect:", json.encode(tk_biz), ":", errcode, sqlstate)
return
end
-- 新建mysql连接
local db_control, err = mysql:new()
if not db_control then
zx_base:log(MODULE, "[TK_SDT]", "failed to instantiate mysql: ", err)
return false
end
-- 超时设置
db_control:set_timeout(1000) -- 1 sec
-- 连接tk_control
local ok, err, errcode, sqlstate = db_control:connect{
host = tk_control['host'],
port = tk_control['port'],
database = tk_control['database'],
user = tk_control['user'],
password = tk_control['password'],
timeout = tk_control['timeout'],
charset = 'utf8'
}
if not ok then
zx_base:log(MODULE, "[TK_SDT]", "failed to connect:", json.encode(tk_control), ":", errcode, sqlstate)
return return
end end
...@@ -48,9 +75,9 @@ local function sdt_sql(db_st) ...@@ -48,9 +75,9 @@ local function sdt_sql(db_st)
-- GET shop_map_id list -- GET shop_map_id list
local shops local shops
local sql_select = 'SELECT DISTINCT shop_map_id FROM node_saleorder UNION SELECT DISTINCT shop_map_id FROM node_rchg' local sql_select = 'SELECT DISTINCT shop_map_id FROM node_saleorder UNION SELECT DISTINCT shop_map_id FROM node_rchg'
shops, err, errcode, sqlstate = db:query(sql_select) shops, err, errcode, sqlstate = db_biz:query(sql_select)
if not shops then if not shops then
zx_base:log("S", "[TK_SDT]", "failed to query:", json.encode(db_st), ":", err, ":", errcode, ":", sqlstate, ".") zx_base:log("S", "[TK_SDT]", "failed to query:", json.encode(tk_biz), ":", err, ":", errcode, ":", sqlstate, ".")
zx_base:log("S", "[TK_SDT]", sql_select) zx_base:log("S", "[TK_SDT]", sql_select)
return return
end end
...@@ -64,9 +91,9 @@ local function sdt_sql(db_st) ...@@ -64,9 +91,9 @@ local function sdt_sql(db_st)
[[AND shop_map_id = %s]], [[AND shop_map_id = %s]],
os.date('%Y%m%d',os.time()), os.date('%Y%m%d',os.time()),
tostring(shop.shop_map_id)) tostring(shop.shop_map_id))
res, err, errcode, sqlstate = db:query(sql) res, err, errcode, sqlstate = db_biz:query(sql)
if not res then if not res then
zx_base:log("S", "[TK_SDT]", "failed to connect:", json.encode(db_st), ":", err, ":", errcode, ":", sqlstate, ".") zx_base:log("S", "[TK_SDT]", "failed to connect:", json.encode(tk_biz), ":", err, ":", errcode, ":", sqlstate, ".")
zx_base:log("S", "[TK_SDT]", sql) zx_base:log("S", "[TK_SDT]", sql)
return return
end end
...@@ -104,7 +131,6 @@ local function sdt_sql(db_st) ...@@ -104,7 +131,6 @@ local function sdt_sql(db_st)
os.date("%Y%m%d", os.time()-24*60*60), os.date("%Y%m%d", os.time()-24*60*60),
tostring(shop.shop_map_id)) tostring(shop.shop_map_id))
-- GET VIP/NOVIP records -- GET VIP/NOVIP records
sql = sql..string.format( sql = sql..string.format(
[[SELECT IFNULL(COUNT(*), 0) t_order_vip_cnt, IFNULL(SUM(amt_pay), 0) t_order_vip_amt ]].. [[SELECT IFNULL(COUNT(*), 0) t_order_vip_cnt, IFNULL(SUM(amt_pay), 0) t_order_vip_amt ]]..
...@@ -292,9 +318,9 @@ local function sdt_sql(db_st) ...@@ -292,9 +318,9 @@ local function sdt_sql(db_st)
-- tostring(shop.shop_map_id)) -- tostring(shop.shop_map_id))
res, err, errcode, sqlstate = db:query(sql) res, err, errcode, sqlstate = db_biz:query(sql)
if not res then if not res then
zx_base:log("S", "[TK_SDT]", "failed to query:", json.encode(db_st), ":", err, ":", errcode, ":", sqlstate, ".") zx_base:log("S", "[TK_SDT]", "failed to query:", json.encode(tk_biz), ":", err, ":", errcode, ":", sqlstate, ".")
zx_base:log("S", "[TK_SDT]", sql) zx_base:log("S", "[TK_SDT]", sql)
return return
end end
...@@ -306,7 +332,7 @@ local function sdt_sql(db_st) ...@@ -306,7 +332,7 @@ local function sdt_sql(db_st)
end end
while err == "again" do while err == "again" do
res, err, errcode, sqlstate = db:read_result() res, err, errcode, sqlstate = db_biz:read_result()
if not res then if not res then
zx_base:log(MODULE, "[TK_SDT]", "bad result #", i, ": ", err, ": ", errcode, ": ", sqlstate, ".") zx_base:log(MODULE, "[TK_SDT]", "bad result #", i, ": ", err, ": ", errcode, ": ", sqlstate, ".")
return ngx.exit(500) return ngx.exit(500)
...@@ -420,6 +446,7 @@ local function sdt_sql(db_st) ...@@ -420,6 +446,7 @@ local function sdt_sql(db_st)
if shop['t_order_amt'] == 0 or shop['t_order_cnt'] == 0 then if shop['t_order_amt'] == 0 or shop['t_order_cnt'] == 0 then
shop['t_order_avg_price'] = 0 shop['t_order_avg_price'] = 0
shop['t_order_profit_per'] = 0
else else
shop['t_order_avg_price'] = shop['t_order_amt'] / shop['t_order_cnt'] shop['t_order_avg_price'] = shop['t_order_amt'] / shop['t_order_cnt']
shop['t_order_profit_per'] = shop['t_order_profit'] / shop['t_order_amt'] * 10000 shop['t_order_profit_per'] = shop['t_order_profit'] / shop['t_order_amt'] * 10000
...@@ -428,7 +455,24 @@ local function sdt_sql(db_st) ...@@ -428,7 +455,24 @@ local function sdt_sql(db_st)
-- INSERT INTO tk_sdt -- INSERT INTO tk_sdt
local str = zx_base:sql_concate('insert into', 'tk_sdt', shop) local str = zx_base:sql_concate('insert into', 'tk_sdt', shop)
zx_base:log(MODULE, str)
-- INSERT INTO tk_biz database
res, err, errcode, sqlstate = db_biz:query(str)
if not res then
zx_base:log("S", "[TK_SDT]", "failed to query:", json.encode(tk_biz),
":", err, ":", errcode, ":", sqlstate, ".")
zx_base:log("S", "[TK_SDT]", str)
return
end
-- INSERT INTO tk_control database
res, err, errcode, sqlstate = db_control:query(str)
if not res then
zx_base:log("S", "[TK_SDT]", "failed to query:", json.encode(tk_control),
":", err, ":", errcode, ":", sqlstate, ".")
zx_base:log("S", "[TK_SDT]", str)
return
end
table.insert(shops, shop) table.insert(shops, shop)
end end
...@@ -438,17 +482,24 @@ end ...@@ -438,17 +482,24 @@ end
-- 异步运行
-- @param table tk_biz 节点机数据库对象
local function async(i) -- table tk_control 总控机数据库对象
-- table tk_database 数据同步数据库名
-- @return nil
local function async(tk_biz, tk_control, tk_database)
local co = coroutine.wrap( local co = coroutine.wrap(
function() function()
sdt_sql(i) sdt_sql(tk_biz, tk_control, tk_database)
end end
) )
table.insert(tasks, co) table.insert(tasks, co)
end end
-- 协程调度
-- @param nil
-- @return nil
local function dispatch() local function dispatch()
local i = 1 local i = 1
while true do while true do
...@@ -468,12 +519,16 @@ local function dispatch() ...@@ -468,12 +519,16 @@ local function dispatch()
end end
end end
function _SDT.run()
-- 跑批业务处理
-- @param nil
-- @return nil
function TK_SDT.run()
local dbs_st = zx_base:db_read(db_conf) local dbs_st = zx_base:db_read(db_conf)
for i = 1, #dbs_st.tk_biz do for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i]) async(dbs_st.tk_biz[i], dbs_st.tk_control)
end end
dispatch() dispatch()
end end
return _SDT return TK_SDT
...@@ -9,7 +9,7 @@ local json = loadmod('cjson') ...@@ -9,7 +9,7 @@ local json = loadmod('cjson')
local db_conf = "tk7_dbs.json" local db_conf = "tk7_dbs.json"
-- 协 -- 协
local tasks = {} local tasks = {}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment