Commit 264197f6 authored by 周尚's avatar 周尚

using syslog-ng instead

parent c5379d91
-- Copyright (C) 2013-2014 Jiale Zhi (calio), CloudFlare Inc.
--require "luacov"
local concat = table.concat
local tcp = ngx.socket.tcp
local udp = ngx.socket.udp
local timer_at = ngx.timer.at
local ngx_log = ngx.log
local ngx_sleep = ngx.sleep
local type = type
local pairs = pairs
local tostring = tostring
local debug = ngx.config.debug
local DEBUG = ngx.DEBUG
local CRIT = ngx.CRIT
local MAX_PORT = 65535
-- table.new(narr, nrec)
local succ, new_tab = pcall(require, "table.new")
if not succ then
new_tab = function () return {} end
end
local _M = new_tab(0, 5)
local is_exiting
if not ngx.config or not ngx.config.ngx_lua_version
or ngx.config.ngx_lua_version < 9003 then
is_exiting = function() return false end
ngx_log(CRIT, "We strongly recommend you to update your ngx_lua module to "
.. "0.9.3 or above. lua-resty-logger-socket will lose some log "
.. "messages when Nginx reloads if it works with ngx_lua module "
.. "below 0.9.3")
else
is_exiting = ngx.worker.exiting
end
_M._VERSION = '0.03'
-- user config
local flush_limit = 4096 -- 4KB
local drop_limit = 1048576 -- 1MB
local timeout = 1000 -- 1 sec
local host
local port
local ssl = false
local ssl_verify = true
local sni_host
local path
local max_buffer_reuse = 10000 -- reuse buffer for at most 10000
-- times
local periodic_flush = nil
local need_periodic_flush = nil
local sock_type = 'tcp'
-- internal variables
local buffer_size = 0
-- 2nd level buffer, it stores logs ready to be sent out
local send_buffer = ""
-- 1st level buffer, it stores incoming logs
local log_buffer_data = new_tab(20000, 0)
-- number of log lines in current 1st level buffer, starts from 0
local log_buffer_index = 0
local last_error
local connecting
local connected
local exiting
local retry_connect = 0
local retry_send = 0
local max_retry_times = 3
local retry_interval = 100 -- 0.1s
local pool_size = 10
local flushing
local logger_initted
local counter = 0
local ssl_session
local function _write_error(msg)
last_error = msg
end
local function _do_connect()
local ok, err, sock
if not connected then
if (sock_type == 'udp') then
sock, err = udp()
else
sock, err = tcp()
end
if not sock then
_write_error(err)
return nil, err
end
sock:settimeout(timeout)
end
-- "host"/"port" and "path" have already been checked in init()
if host and port then
if (sock_type == 'udp') then
ok, err = sock:setpeername(host, port)
else
ok, err = sock:connect(host, port)
end
elseif path then
ok, err = sock:connect("unix:" .. path)
end
if not ok then
return nil, err
end
return sock
end
local function _do_handshake(sock)
if not ssl then
return sock
end
local session, err = sock:sslhandshake(ssl_session, sni_host or host,
ssl_verify)
if not session then
return nil, err
end
ssl_session = session
return sock
end
local function _connect()
local err, sock
if connecting then
if debug then
ngx_log(DEBUG, "previous connection not finished")
end
return nil, "previous connection not finished"
end
connected = false
connecting = true
retry_connect = 0
while retry_connect <= max_retry_times do
sock, err = _do_connect()
if sock then
sock, err = _do_handshake(sock)
if sock then
connected = true
break
end
end
if debug then
ngx_log(DEBUG, "reconnect to the log server: ", err)
end
-- ngx.sleep time is in seconds
if not exiting then
ngx_sleep(retry_interval / 1000)
end
retry_connect = retry_connect + 1
end
connecting = false
if not connected then
return nil, "try to connect to the log server failed after "
.. max_retry_times .. " retries: " .. err
end
return sock
end
local function _prepare_stream_buffer()
local packet = concat(log_buffer_data, "", 1, log_buffer_index)
send_buffer = send_buffer .. packet
log_buffer_index = 0
counter = counter + 1
if counter > max_buffer_reuse then
log_buffer_data = new_tab(20000, 0)
counter = 0
if debug then
ngx_log(DEBUG, "log buffer reuse limit (" .. max_buffer_reuse
.. ") reached, create a new \"log_buffer_data\"")
end
end
end
local function _do_flush()
local ok, err, sock, bytes
local packet = send_buffer
sock, err = _connect()
if not sock then
return nil, err
end
bytes, err = sock:send(packet)
if not bytes then
-- "sock:send" always closes current connection on error
return nil, err
end
if debug then
ngx.update_time()
ngx_log(DEBUG, ngx.now(), ":log flush:" .. bytes .. ":" .. packet)
end
if (sock_type ~= 'udp') then
ok, err = sock:setkeepalive(0, pool_size)
if not ok then
return nil, err
end
end
return bytes
end
local function _need_flush()
if buffer_size > 0 then
return true
end
return false
end
local function _flush_lock()
if not flushing then
if debug then
ngx_log(DEBUG, "flush lock acquired")
end
flushing = true
return true
end
return false
end
local function _flush_unlock()
if debug then
ngx_log(DEBUG, "flush lock released")
end
flushing = false
end
local function _flush()
local err
-- pre check
if not _flush_lock() then
if debug then
ngx_log(DEBUG, "previous flush not finished")
end
-- do this later
return true
end
if not _need_flush() then
if debug then
ngx_log(DEBUG, "no need to flush:", log_buffer_index)
end
_flush_unlock()
return true
end
-- start flushing
retry_send = 0
if debug then
ngx_log(DEBUG, "start flushing")
end
local bytes
while retry_send <= max_retry_times do
if log_buffer_index > 0 then
_prepare_stream_buffer()
end
bytes, err = _do_flush()
if bytes then
break
end
if debug then
ngx_log(DEBUG, "resend log messages to the log server: ", err)
end
-- ngx.sleep time is in seconds
if not exiting then
ngx_sleep(retry_interval / 1000)
end
retry_send = retry_send + 1
end
_flush_unlock()
if not bytes then
local err_msg = "try to send log messages to the log server "
.. "failed after " .. max_retry_times .. " retries: "
.. err
_write_error(err_msg)
return nil, err_msg
else
if debug then
ngx_log(DEBUG, "send " .. bytes .. " bytes")
end
end
buffer_size = buffer_size - #send_buffer
send_buffer = ""
return bytes
end
local function _periodic_flush(premature)
if premature then
exiting = true
end
if need_periodic_flush or exiting then
-- no regular flush happened after periodic flush timer had been set
if debug then
ngx_log(DEBUG, "performing periodic flush")
end
_flush()
else
if debug then
ngx_log(DEBUG, "no need to perform periodic flush: regular flush "
.. "happened before")
end
need_periodic_flush = true
end
timer_at(periodic_flush, _periodic_flush)
end
local function _flush_buffer()
local ok, err = timer_at(0, _flush)
need_periodic_flush = false
if not ok then
_write_error(err)
return nil, err
end
end
local function _write_buffer(msg, len)
log_buffer_index = log_buffer_index + 1
log_buffer_data[log_buffer_index] = msg
buffer_size = buffer_size + len
return buffer_size
end
function _M.init(user_config)
if (type(user_config) ~= "table") then
return nil, "user_config must be a table"
end
for k, v in pairs(user_config) do
if k == "host" then
if type(v) ~= "string" then
return nil, '"host" must be a string'
end
host = v
elseif k == "port" then
if type(v) ~= "number" then
return nil, '"port" must be a number'
end
if v < 0 or v > MAX_PORT then
return nil, ('"port" out of range 0~%s'):format(MAX_PORT)
end
port = v
elseif k == "path" then
if type(v) ~= "string" then
return nil, '"path" must be a string'
end
path = v
elseif k == "sock_type" then
if type(v) ~= "string" then
return nil, '"sock_type" must be a string'
end
if v ~= "tcp" and v ~= "udp" then
return nil, '"sock_type" must be "tcp" or "udp"'
end
sock_type = v
elseif k == "flush_limit" then
if type(v) ~= "number" or v < 0 then
return nil, 'invalid "flush_limit"'
end
flush_limit = v
elseif k == "drop_limit" then
if type(v) ~= "number" or v < 0 then
return nil, 'invalid "drop_limit"'
end
drop_limit = v
elseif k == "timeout" then
if type(v) ~= "number" or v < 0 then
return nil, 'invalid "timeout"'
end
timeout = v
elseif k == "max_retry_times" then
if type(v) ~= "number" or v < 0 then
return nil, 'invalid "max_retry_times"'
end
max_retry_times = v
elseif k == "retry_interval" then
if type(v) ~= "number" or v < 0 then
return nil, 'invalid "retry_interval"'
end
-- ngx.sleep time is in seconds
retry_interval = v
elseif k == "pool_size" then
if type(v) ~= "number" or v < 0 then
return nil, 'invalid "pool_size"'
end
pool_size = v
elseif k == "max_buffer_reuse" then
if type(v) ~= "number" or v < 0 then
return nil, 'invalid "max_buffer_reuse"'
end
max_buffer_reuse = v
elseif k == "periodic_flush" then
if type(v) ~= "number" or v < 0 then
return nil, 'invalid "periodic_flush"'
end
periodic_flush = v
elseif k == "ssl" then
if type(v) ~= "boolean" then
return nil, '"ssl" must be a boolean value'
end
ssl = v
elseif k == "ssl_verify" then
if type(v) ~= "boolean" then
return nil, '"ssl_verify" must be a boolean value'
end
ssl_verify = v
elseif k == "sni_host" then
if type(v) ~= "string" then
return nil, '"sni_host" must be a string'
end
sni_host = v
end
end
if not (host and port) and not path then
return nil, "no logging server configured. \"host\"/\"port\" or "
.. "\"path\" is required."
end
if (flush_limit >= drop_limit) then
return nil, "\"flush_limit\" should be < \"drop_limit\""
end
flushing = false
exiting = false
connecting = false
connected = false
retry_connect = 0
retry_send = 0
logger_initted = true
if periodic_flush then
if debug then
ngx_log(DEBUG, "periodic flush enabled for every "
.. periodic_flush .. " seconds")
end
need_periodic_flush = true
timer_at(periodic_flush, _periodic_flush)
end
return logger_initted
end
function _M.log(msg)
if not logger_initted then
return nil, "not initialized"
end
local bytes
if type(msg) ~= "string" then
msg = tostring(msg)
end
local msg_len = #msg
if (debug) then
ngx.update_time()
ngx_log(DEBUG, ngx.now(), ":log message length: " .. msg_len)
end
-- response of "_flush_buffer" is not checked, because it writes
-- error buffer
if (is_exiting()) then
exiting = true
_write_buffer(msg, msg_len)
_flush_buffer()
if (debug) then
ngx_log(DEBUG, "Nginx worker is exiting")
end
bytes = 0
elseif (msg_len + buffer_size < flush_limit) then
_write_buffer(msg, msg_len)
bytes = msg_len
elseif (msg_len + buffer_size <= drop_limit) then
_write_buffer(msg, msg_len)
_flush_buffer()
bytes = msg_len
else
_flush_buffer()
if (debug) then
ngx_log(DEBUG, "logger buffer is full, this log message will be "
.. "dropped")
end
bytes = 0
--- this log message doesn't fit in buffer, drop it
end
if last_error then
local err = last_error
last_error = nil
return bytes, err
end
return bytes
end
function _M.initted()
return logger_initted
end
_M.flush = _flush
return _M
...@@ -2,14 +2,14 @@ ...@@ -2,14 +2,14 @@
worker_processes auto; worker_processes auto;
#error_log logs/error.log; #error_log logs/error.log;
#error_log logs/error.log notice; error_log logs/error.log notice;
#error_log logs/error.log info; #error_log logs/error.log info;
#pid logs/nginx.pid; #pid logs/nginx.pid;
events { events {
worker_connections 1024; worker_connections 10000;
} }
# worker_rlimit_nofile 30000; # worker_rlimit_nofile 30000;
...@@ -61,41 +61,20 @@ http { ...@@ -61,41 +61,20 @@ http {
#charset koi8-r; #charset koi8-r;
#access_log logs/host.access.log main; # access_log logs/host.access.log main;
# location / {
# root /Users/zhoush/Documents/Private/Notes;
# index README.html;
# }
location / { location / {
root /Users/zhoush/Documents/Private/Notes/; root /Users/zhoush/Documents/Private/Notes;
index README.html; index README.html;
lua_code_cache on;
log_by_lua_block {
local logger = require("socket")
if not logger.initted() then
local ok, err = logger.init {
host = '127.0.0.1',
port = 1234,
flush_limit = 4096
} }
if not ok then
ngx.log(ngx.ERR, "failed to initialize the logger: ", err)
return
end
end
-- construct the custom access log message in
-- the Lua variable "msg"
local bytes, err = logger.log(msg) # location / {
if err then # root /Users/zhoush/Documents/Private/Notes/;
ngx.log(ngx.ERR, "failed to log message: ", err) # index README.html;
return # lua_code_cache on;
end # content_by_lua_file lua/luabiz/log.lua;
} # # log_by_lua_file lua/luabiz/log.lua;
} # }
location /get_addr { location /get_addr {
content_by_lua_block { content_by_lua_block {
...@@ -105,6 +84,10 @@ http { ...@@ -105,6 +84,10 @@ http {
} }
} }
location /pos {
content_by_lua_file "lua/luabiz/pos.lua";
}
location /settle { location /settle {
content_by_lua_block { content_by_lua_block {
local settle = loadmod("settle") local settle = loadmod("settle")
...@@ -119,6 +102,9 @@ http { ...@@ -119,6 +102,9 @@ http {
} }
} }
location /log {
}
location /echo { location /echo {
content_by_lua_block { content_by_lua_block {
......
local _M = {}
local logger = require "resty.logger.socket"
function _M.__FILE__ () return string.match(debug.getinfo(2,'S').source, ".+/([^/]*%.%w+)$") end
function _M.__LINE__ () return debug.getinfo(2,'l').currentline end
function _M.log(self, ...)
if not logger.initted() then
local ok, err = logger.init{
host = '0.0.0.0',
port = 1234,
flush_limit = 2048, --日志长度大于flush_limit的时候会将msg信息推送一次
drop_limit = 99999,
}
if not ok then
ngx.log(ngx.ERR, "failed to initialize the logger: ",err)
return
end
end
local str = ""
for i = 1, select("#", ...) do
local tmp = tostring(select(i, ...))
str = string.format("%s %s", str, tmp)
end
local bytes, err = logger.log(str .. "\n")
if err then
ngx.log(ngx.ERR, "failed to log message: ", err)
ngx.log(ngx.NOTICE, str)
return
end
end
function _M.BEGIN(self, modname)
local str = string.format("\n\n\nNOTICE: " .. "-----------------------> <%s> BEGIN\n", string.upper(modname))
logger.log(str)
logger.flush()
end
function _M.END(self, modname)
local str = string.format("NOTICE: " .. "-----------------------> <%s> END\n\n\n", string.upper(modname))
logger.log(str)
logger.flush()
end
return _M
...@@ -5,121 +5,79 @@ local NODE_CARD = {} ...@@ -5,121 +5,79 @@ local NODE_CARD = {}
local mysql = loadmod("resty.mysql") local mysql = loadmod("resty.mysql")
local zx_base = loadmod("zx_base") local zx_base = loadmod("zx_base")
local log_lua = loadmod("log")
local M = string.format("%-16s", "<NODE_CARD>") local M = string.format("%-16s", "<NODE_CARD>")
-- local json = loadmod('cjson') -- local json = loadmod('cjson')
-- local db_conf = "tk7_dbs.json" -- local db_conf = "tk7_dbs.json"
local tasks = {} local db_biz
local function card_sql(db_st, total_time) local function BEGIN(tk_biz)
ngx.log(ngx.NOTICE, "-----------------------> START") log_lua:BEGIN("node_card")
local db, err = mysql:new() local err
if not db then db_biz, err = mysql:new()
ngx.log(ngx.ERR, "failed to instantiate mysql: ", err) if not db_biz then
log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "ERR: ", "failed to instantiate mysql: ", err)
return false return false
end end
db:set_timeout(1000) -- 1 sec db_biz:set_timeout(1000) -- 1 sec
local ok, err, errcode, sqlstate = db:connect{ local ok, err, errcode, sqlstate = db_biz:connect{
host = zx_base:_get_addr(db_st['host']), host = zx_base:_get_addr(tk_biz['host']),
port = db_st['port'], port = tk_biz['port'],
database = db_st['database'], database = tk_biz['database'],
user = db_st['user'], user = tk_biz['user'],
password = db_st['password'], password = tk_biz['password'],
timeout = db_st['timeout'], timeout = tk_biz['timeout'],
charset = 'utf8' charset = 'utf8'
} }
if not ok then if not ok then
ngx.log(ngx.ERR, err, ": ", errcode, " ", sqlstate) log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "ERR: ", err, ": ", errcode, " ", sqlstate)
return return false
end end
return true
end
local function END()
zx_base:close_db(db_biz)
log_lua:END("node_card")
end
local function card_sql()
-- DELETE invalid rows -- DELETE invalid rows
-- local sql_delete = [[DELETE FROM node_card WHERE (card_status != 0 AND card_status !=1) or now() > card_exp]] -- local sql_delete = [[DELETE FROM node_card WHERE (card_status != 0 AND card_status !=1) or now() > card_exp]]
local sql_update = string.format([[UPDATE node_card SET card_status = 8 WHERE %d > card_exp]], os.date("%Y%m%d")) local sql_update = string.format([[UPDATE node_card SET card_status = 8 WHERE %d > card_exp]], os.date("%Y%m%d"))
local res, err, errcode, sqlstate = db:query(sql_update) local res, err, errcode, sqlstate = db_biz:query(sql_update)
ngx.log(ngx.NOTICE, sql_update) log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql_update)
if not res then if not res then
ngx.log(ngx.ERR, "bad result: ", err, ": ", errcode, ": ", sqlstate, ".") log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "ERR: ", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
return return
end end
-- SET yesterday's balance -- SET yesterday's balance
local sql_update = [[UPDATE node_card SET card_lbal_amt = card_cbal_amt, card_lbal_count = card_cbal_count]] sql_update = [[UPDATE node_card SET card_lbal_amt = card_cbal_amt, card_lbal_count = card_cbal_count]]
local res, err, errcode, sqlstate = db:query(sql_update) local res, err, errcode, sqlstate = db_biz:query(sql_update)
ngx.log(ngx.NOTICE, sql_update) log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql_update)
if not res then if not res then
ngx.log(ngx.ERR, "bad result: ", err, ": ", errcode, ": ", sqlstate, ".") log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "ERR: ", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
return return
end end
-- -- INSERT INTO history table
-- local sql_insert = string.format([[INSERT INTO node_card_his SELECT * FROM node_card ]]..
-- -- [[WHERE DATE(`create_time`) < DATE('%s')]],
-- -- [[WHERE EXTRACT(YEAR_MONTH FROM `create_time`) = EXTRACT(YEAR_MONTH FROM DATE_SUB('%s', INTERVAL 3 MONTH))]],
-- os.date("%Y%m%d%H%M%S", total_time))
-- ngx.log(ngx.NOTICE, sql_insert)
-- local res, err, errcode, sqlstate = db:query(sql_insert)
-- if not res then
-- ngx.log(ngx.ERR, "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
-- return
-- end
-- -- DELETE current tables record
-- local sql_delete = string.format([[DELETE FROM node_card WHERE DATE(`create_time`) < DATE('%s')]],
-- os.date("%Y%m%d%H%M%S", total_time - 24 * 60 * 60 ))
-- ngx.log(ngx.NOTICE, sql_delete)
-- local res, err, errcode, sqlstate = db:query(sql_delete)
-- if not res then
-- ngx.log(ngx.ERR, "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
-- return
-- end
ngx.log(ngx.NOTICE, "-----------------------> END")
zx_base:close_db(db)
end end
function NODE_CARD.run(self)
local function async(i, total_time)
local co = coroutine.wrap(
function()
card_sql(i, total_time)
end
)
table.insert(tasks, co)
end
local function dispatch()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks, i)
else
i = i + 1
end
end
end
function NODE_CARD.run(self, total_time)
-- -- local dbs_st = zx_base:db_read(db_conf)
for i = 1, #dbs_st.tk_biz do for i = 1, #dbs_st.tk_biz do
-- async(dbs_st.tk_biz[i]) if BEGIN(dbs_st.tk_biz[i]) then
card_sql(dbs_st.tk_biz[i], total_time) card_sql()
end
END()
end end
dispatch()
end end
return NODE_CARD return NODE_CARD
...@@ -2,112 +2,68 @@ local NODE_TICKET = {} ...@@ -2,112 +2,68 @@ local NODE_TICKET = {}
local mysql = loadmod("resty.mysql") local mysql = loadmod("resty.mysql")
local zx_base = loadmod('zx_base') local zx_base = loadmod('zx_base')
local logger = loadmod("log")
local db_conf = "tk7_dbs.json" local db_conf = "tk7_dbs.json"
local M = string.format("%-16s", "<NODE_TICKET>") local M = string.format("%-16s", "<NODE_TICKET>")
local tasks = {}
local db_biz
local function BEGIN(tk_biz)
logger:BEGIN("node_ticket")
local err
local function ticket_sql(db_st, total_time) db_biz, err = mysql:new()
ngx.log(ngx.NOTICE, "-----------------------> START") if not db_biz then
logger:log(logger:__FILE__() .. ":" .. logger:__LINE__(), "ERR: ", "failed to instantiate mysql: ", err)
local db, err = mysql:new()
if not db then
ngx.log(ngx.ERR, "failed to instantiate mysql: ", err)
return false return false
end end
db:set_timeout(1000) -- 1 sec db_biz:set_timeout(1000) -- 1 sec
local ok, err, errcode, sqlstate = db:connect{ local ok, err, errcode, sqlstate = db_biz:connect{
host = zx_base:_get_addr(db_st['host']), host = zx_base:_get_addr(tk_biz['host']),
port = db_st['port'], port = tk_biz['port'],
database = db_st['database'], database = tk_biz['database'],
user = db_st['user'], user = tk_biz['user'],
password = db_st['password'], password = tk_biz['password'],
timeout = db_st['timeout'], timeout = tk_biz['timeout'],
charset = 'utf8' charset = 'utf8'
} }
if not ok then if not ok then
ngx.log(ngx.ERR, err, ": ", errcode, " ", sqlstate) logger:log(logger:__FILE__() .. ":" .. logger:__LINE__(), "ERR: ", err, ": ", errcode, " ", sqlstate)
return return false
end
-- DELETE invalid rows
-- local sql_delete = [[DELETE FROM node_ticket WHERE (ticket_status != 0 AND ticket_status !=1) or now() > ticket_exp]]
local sql_update = string.format([[UPDATE node_ticket SET ticket_status = 8 WHERE %d > ticket_exp]], os.date("%Y%m%d"))
ngx.log(ngx.NOTICE, sql_update)
local res, err, errcode, sqlstate = db:query(sql_update)
if not res then
ngx.log(ngx.ERR, "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
return
end end
-- -- INSERT INTO history table return true
-- local sql_insert = string.format([[INSERT INTO node_ticket_his SELECT * FROM node_ticket WHERE ]]..
-- [[DATE(`create_time`) < DATE('%s')]],
-- os.date("%Y%m%d%H%M%S", total_time - 24 * 60 * 60))
-- ngx.log(ngx.NOTICE, sql_insert)
-- local res, err, errcode, sqlstate = db:query(sql_insert)
-- if not res then
-- ngx.log(ngx.ERR, "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
-- return
-- end
-- -- DELETE current tables record
-- local sql_delete = string.format([[DELETE FROM node_ticket WHERE DATE(`create_time`) < DATE('%s')]],
-- os.date("%Y%m%d%H%M%S", total_time - 24 * 60 * 60))
-- ngx.log(ngx.NOTICE, sql_delete)
-- local res, err, errcode, sqlstate = db:query(sql_delete)
-- if not res then
-- ngx.log(ngx.ERR, "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
-- return
-- end
-- ngx.log(ngx.NOTICE, "-----------------------> END")
zx_base:close_db(db)
end end
local function END()
zx_base:close_db(db_biz)
local function async(i, total_time) logger:END("node_ticket")
local co = coroutine.wrap(
function()
ticket_sql(i, total_time)
end
)
table.insert(tasks, co)
end end
local function dispatch() local function ticket_sql()
local i = 1 logger:BEGIN("node_ticket")
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]() -- UPDATE tickets' status
local sql_update = string.format([[UPDATE node_ticket SET ticket_status = 8 WHERE %d > ticket_exp]], os.date("%Y%m%d"))
logger:log(logger:__FILE__() .. ":" .. logger:__LINE__(), "NOTICE: ", sql_update)
local res, err, errcode, sqlstate = db_biz:query(sql_update)
if not res then if not res then
table.remove(tasks, i) logger:log(logger:__FILE__() .. ":" .. logger:__LINE__(), "ERR: ", "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
else return
i = i + 1
end
end end
end end
function NODE_TICKET.run(self, total_time) function NODE_TICKET.run(self)
-- local json = loadmod('cjson')
-- -- local dbs_st = zx_base:db_read(db_conf)
for i = 1, #dbs_st.tk_biz do for i = 1, #dbs_st.tk_biz do
-- async(dbs_st.tk_biz[i]) if BEGIN(dbs_st.tk_biz[i]) then
ticket_sql(dbs_st.tk_biz[i], total_time) ticket_sql()
end
END()
end end
dispatch()
end end
return NODE_TICKET return NODE_TICKET
...@@ -119,8 +119,8 @@ function SETTLE.run() ...@@ -119,8 +119,8 @@ function SETTLE.run()
-- node_vipacct: run() -- node_vipacct: run()
tk_sync: run() tk_sync: run()
node_card: run(t) node_card: run()
node_ticket: run(t) node_ticket: run()
tk_sdt: run(t) tk_sdt: run(t)
end end
......
...@@ -5,24 +5,24 @@ local TK_SDT = {} ...@@ -5,24 +5,24 @@ local TK_SDT = {}
local mysql = loadmod("resty.mysql") local mysql = loadmod("resty.mysql")
local zx_base = loadmod("zx_base") local zx_base = loadmod("zx_base")
local json = loadmod('cjson') local json = loadmod('cjson')
local log_lua = loadmod('log')
-- 协程组 local db_biz -- tk_biz handle
local tasks = {} local db_control -- tk_control handle
-- 门店统计 -- 初始化数据库连接
-- @param table tk_biz 节点机数据库对象 -- @param table tk_biz 节点机数据库对象
-- table tk_control 总控机数据库对象 -- table tk_control 总控机数据库对象
-- table tk_database 数据同步数据库名 -- @return true/false
-- @return nil local function BEGIN(tk_biz, tk_control)
local function sdt_sql(tk_biz, tk_control, total_time) log_lua:BEGIN("tk_sdt")
ngx.log(ngx.NOTICE, "-----------------------> START") local err
-- 新建mysql连接 -- 新建mysql连接
local db_biz, err = mysql:new() db_biz, err = mysql:new()
if not db_biz then if not db_biz then
ngx.log(ngx.ERR, "failed to instantiate mysql: ", err) log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "ERR: " .. "failed to instantiate mysql: ", err)
return false return false
end end
...@@ -40,14 +40,14 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -40,14 +40,14 @@ local function sdt_sql(tk_biz, tk_control, total_time)
charset = 'utf8' charset = 'utf8'
} }
if not ok then if not ok then
ngx.log(ngx.ERR, json.encode(tk_biz), ":", err, errcode, sqlstate) log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "ERR: " .. json.encode(tk_biz), ":", err, errcode, sqlstate)
return return false
end end
-- 新建mysql连接 -- 新建mysql连接
local db_control, err = mysql:new() db_control, err = mysql:new()
if not db_control then if not db_control then
ngx.log(ngx.ERR, "failed to instantiate mysql: ", err) log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "ERR: " .. "failed to instantiate mysql: ", err)
return false return false
end end
...@@ -65,19 +65,36 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -65,19 +65,36 @@ local function sdt_sql(tk_biz, tk_control, total_time)
charset = 'utf8' charset = 'utf8'
} }
if not ok then if not ok then
ngx.log(ngx.ERR, json.encode(tk_control), ":", err, errcode, sqlstate) log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "ERR: " .. json.encode(tk_control), ":", err, errcode, sqlstate)
return return false
end end
return true;
end
-- 清理连接句柄
-- @param nil
-- @return nil
local function END()
zx_base:close_db(db_biz)
zx_base:close_db(db_control)
log_lua:END("tk_sdt")
end
-- 门店统计
-- @param nil
-- @return nil
local function sdt_sql(tk_biz, tk_control, total_time)
-- GET shop_map_id list -- GET shop_map_id list
local shops local shops
-- local sql_select = 'SELECT DISTINCT shop_map_id FROM node_saleorder UNION SELECT DISTINCT shop_map_id FROM node_rchg;' -- local sql_select = 'SELECT DISTINCT shop_map_id FROM node_saleorder UNION SELECT DISTINCT shop_map_id FROM node_rchg;'
local sql_select = 'SELECT mer_map_id, shop_map_id FROM tk_shop' local sql_select = 'SELECT mer_map_id, shop_map_id FROM tk_shop'
shops, err, errcode, sqlstate = db_biz:query(sql_select) shops, err, errcode, sqlstate = db_biz:query(sql_select)
ngx.log(ngx.NOTICE, sql_select) log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql_select)
if not shops then if not shops then
ngx.log(ngx.ERR, "failed to query:", json.encode(tk_biz), ":", err, ":", errcode, ":", sqlstate, ".") log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "ERR: " .. "failed to query:", json.encode(tk_biz), ":", err, ":", errcode, ":", sqlstate, ".")
return return
end end
...@@ -90,22 +107,24 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -90,22 +107,24 @@ local function sdt_sql(tk_biz, tk_control, total_time)
shop = shops[i] shop = shops[i]
-- -- CHECK if exists -- CHECK if exists
-- local sql_do = string.format( local sql_check = string.format(
-- [[SELECT * FROM tk_sdt WHERE DATE(total_date) = DATE('%s') ]].. [[SELECT * FROM tk_sdt WHERE DATE(total_date) = DATE('%s') ]]..
-- [[AND shop_map_id = %s]], [[AND shop_map_id = %s]],
-- os.date('%Y%m%d',os.time()), os.date('%Y%m%d',os.time()),
-- tostring(shop.shop_map_id)) tostring(shop.shop_map_id))
-- res, err, errcode, sqlstate = db_biz:query(sql_do) res, err, errcode, sqlstate = db_biz:query(sql_check)
-- if not res then if not res then
-- ngx.log(ngx.ERR, "failed to connect:", json.encode(tk_biz), ":", err, ":", errcode, ":", sqlstate, ".") log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "ERR: " .. "failed to connect:", json.encode(tk_biz),
-- ngx.log(ngx.ERR, sql_do) ":", err, ":", errcode, ":", sqlstate, ".")
-- return log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "ERR: " .. sql_check)
-- end return
-- if #res ~= 0 then end
-- ngx.log(ngx.NOTICE, shop['shop_map_id'], " finished already") if #res ~= 0 then
-- break log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. shop['shop_map_id'],
-- end " finished already")
break
end
-- GET summary -- GET summary
table.insert(sql, string.format( table.insert(sql, string.format(
...@@ -125,7 +144,8 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -125,7 +144,8 @@ local function sdt_sql(tk_biz, tk_control, total_time)
os.date("%Y%m%d", total_time-24*60*60), os.date("%Y%m%d", total_time-24*60*60),
tostring(shop.shop_map_id)) tostring(shop.shop_map_id))
) )
-- ngx.log(ngx.NOTICE, sql[1])
-- log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql[1])
-- GET online records -- GET online records
table.insert(sql, string.format( table.insert(sql, string.format(
...@@ -138,7 +158,7 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -138,7 +158,7 @@ local function sdt_sql(tk_biz, tk_control, total_time)
os.date("%Y%m%d", total_time-24*60*60), os.date("%Y%m%d", total_time-24*60*60),
tostring(shop.shop_map_id)) tostring(shop.shop_map_id))
) )
-- ngx.log(ngx.NOTICE, sql[2]) -- log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql[2])
-- GET VIP/NOVIP records -- GET VIP/NOVIP records
table.insert(sql, string.format( table.insert(sql, string.format(
...@@ -151,7 +171,7 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -151,7 +171,7 @@ local function sdt_sql(tk_biz, tk_control, total_time)
os.date("%Y%m%d", total_time-24*60*60), os.date("%Y%m%d", total_time-24*60*60),
tostring(shop.shop_map_id)) tostring(shop.shop_map_id))
) )
-- ngx.log(ngx.NOTICE, sql[3]) -- log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql[3])
-- GET MS records -- GET MS records
table.insert(sql, string.format( table.insert(sql, string.format(
...@@ -164,7 +184,7 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -164,7 +184,7 @@ local function sdt_sql(tk_biz, tk_control, total_time)
os.date("%Y%m%d", total_time-24*60*60), os.date("%Y%m%d", total_time-24*60*60),
tostring(shop.shop_map_id)) tostring(shop.shop_map_id))
) )
-- ngx.log(ngx.NOTICE, sql[4]) -- log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql[4])
-- GET 0000-0900 order -- GET 0000-0900 order
table.insert(sql, string.format( table.insert(sql, string.format(
...@@ -177,7 +197,7 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -177,7 +197,7 @@ local function sdt_sql(tk_biz, tk_control, total_time)
os.date('%Y%m%d000000', total_time-24*60*60), os.date('%Y%m%d000000', total_time-24*60*60),
os.date('%Y%m%d090000', total_time-24*60*60)) os.date('%Y%m%d090000', total_time-24*60*60))
) )
-- ngx.log(ngx.NOTICE, sql[5]) -- log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql[5])
-- GET 0900-1100 order -- GET 0900-1100 order
table.insert(sql, string.format( table.insert(sql, string.format(
...@@ -190,7 +210,7 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -190,7 +210,7 @@ local function sdt_sql(tk_biz, tk_control, total_time)
os.date('%Y%m%d090000', total_time-24*60*60), os.date('%Y%m%d090000', total_time-24*60*60),
os.date('%Y%m%d110000', total_time-24*60*60)) os.date('%Y%m%d110000', total_time-24*60*60))
) )
-- ngx.log(ngx.NOTICE, sql[6]) -- log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql[6])
-- GET 1100-1300 order -- GET 1100-1300 order
table.insert(sql, string.format( table.insert(sql, string.format(
...@@ -203,7 +223,7 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -203,7 +223,7 @@ local function sdt_sql(tk_biz, tk_control, total_time)
os.date('%Y%m%d110000', total_time-24*60*60), os.date('%Y%m%d110000', total_time-24*60*60),
os.date('%Y%m%d130000', total_time-24*60*60)) os.date('%Y%m%d130000', total_time-24*60*60))
) )
-- ngx.log(ngx.NOTICE, sql[7]) -- log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql[7])
-- GET 1300-1500 order -- GET 1300-1500 order
table.insert(sql, string.format( table.insert(sql, string.format(
...@@ -216,7 +236,7 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -216,7 +236,7 @@ local function sdt_sql(tk_biz, tk_control, total_time)
os.date('%Y%m%d130000', total_time-24*60*60), os.date('%Y%m%d130000', total_time-24*60*60),
os.date('%Y%m%d150000', total_time-24*60*60)) os.date('%Y%m%d150000', total_time-24*60*60))
) )
-- ngx.log(ngx.NOTICE, sql[8]) -- log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql[8])
-- GET 1500-1700 order -- GET 1500-1700 order
table.insert(sql, string.format( table.insert(sql, string.format(
...@@ -229,7 +249,7 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -229,7 +249,7 @@ local function sdt_sql(tk_biz, tk_control, total_time)
os.date('%Y%m%d150000', total_time-24*60*60), os.date('%Y%m%d150000', total_time-24*60*60),
os.date('%Y%m%d170000', total_time-24*60*60)) os.date('%Y%m%d170000', total_time-24*60*60))
) )
-- ngx.log(ngx.NOTICE, sql[9]) -- log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql[9])
-- GET 1700-1900 order -- GET 1700-1900 order
table.insert(sql, string.format( table.insert(sql, string.format(
...@@ -242,7 +262,7 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -242,7 +262,7 @@ local function sdt_sql(tk_biz, tk_control, total_time)
os.date('%Y%m%d170000', total_time-24*60*60), os.date('%Y%m%d170000', total_time-24*60*60),
os.date('%Y%m%d190000', total_time-24*60*60)) os.date('%Y%m%d190000', total_time-24*60*60))
) )
-- ngx.log(ngx.NOTICE, sql[10]) -- log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql[10])
-- GET 1900-2100 order -- GET 1900-2100 order
table.insert(sql, string.format( table.insert(sql, string.format(
...@@ -255,7 +275,7 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -255,7 +275,7 @@ local function sdt_sql(tk_biz, tk_control, total_time)
os.date('%Y%m%d190000', total_time-24*60*60), os.date('%Y%m%d190000', total_time-24*60*60),
os.date('%Y%m%d210000', total_time-24*60*60)) os.date('%Y%m%d210000', total_time-24*60*60))
) )
-- ngx.log(ngx.NOTICE, sql[11]) -- log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql[11])
-- GET 2100-2400 order -- GET 2100-2400 order
table.insert(sql, string.format( table.insert(sql, string.format(
...@@ -268,7 +288,7 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -268,7 +288,7 @@ local function sdt_sql(tk_biz, tk_control, total_time)
os.date('%Y%m%d210000', total_time-24*60*60), os.date('%Y%m%d210000', total_time-24*60*60),
os.date('%Y%m%d235959', total_time-24*60*60)) os.date('%Y%m%d235959', total_time-24*60*60))
) )
-- ngx.log(ngx.NOTICE, sql[12]) -- log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql[12])
-- get VIP cnt -- get VIP cnt
table.insert(sql, string.format( table.insert(sql, string.format(
...@@ -280,7 +300,7 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -280,7 +300,7 @@ local function sdt_sql(tk_biz, tk_control, total_time)
os.date('%Y%m%d', total_time-24*60*60), os.date('%Y%m%d', total_time-24*60*60),
tostring(shop['shop_map_id'])) tostring(shop['shop_map_id']))
) )
-- ngx.log(ngx.NOTICE, sql[13]) -- log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql[13])
-- GET VIP new cnt -- GET VIP new cnt
table.insert(sql, string.format( table.insert(sql, string.format(
...@@ -292,7 +312,7 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -292,7 +312,7 @@ local function sdt_sql(tk_biz, tk_control, total_time)
os.date("%Y%m%d", total_time-24*60*60), os.date("%Y%m%d", total_time-24*60*60),
tostring(shop['shop_map_id'])) tostring(shop['shop_map_id']))
) )
-- ngx.log(ngx.NOTICE, sql[14]) -- log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql[14])
-- GET charge summary -- GET charge summary
table.insert(sql, string.format( table.insert(sql, string.format(
...@@ -303,7 +323,7 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -303,7 +323,7 @@ local function sdt_sql(tk_biz, tk_control, total_time)
os.date("%Y%m%d", total_time-24*60*60), os.date("%Y%m%d", total_time-24*60*60),
tostring(shop.shop_map_id)) tostring(shop.shop_map_id))
) )
-- ngx.log(ngx.NOTICE, sql[15]) -- log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql[15])
-- GET cash summary -- GET cash summary
table.insert(sql, string.format( table.insert(sql, string.format(
...@@ -315,7 +335,7 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -315,7 +335,7 @@ local function sdt_sql(tk_biz, tk_control, total_time)
os.date("%Y%m%d", total_time-24*60*60), os.date("%Y%m%d", total_time-24*60*60),
tostring(shop.shop_map_id)) tostring(shop.shop_map_id))
) )
-- ngx.log(ngx.NOTICE, sql[16]) -- log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql[16])
-- GET WeiXin summary -- GET WeiXin summary
table.insert(sql, string.format( table.insert(sql, string.format(
...@@ -327,7 +347,7 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -327,7 +347,7 @@ local function sdt_sql(tk_biz, tk_control, total_time)
os.date("%Y%m%d", total_time-24*60*60), os.date("%Y%m%d", total_time-24*60*60),
tostring(shop.shop_map_id)) tostring(shop.shop_map_id))
) )
-- ngx.log(ngx.NOTICE, sql[17]) -- log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql[17])
-- GET AliPay summary -- GET AliPay summary
table.insert(sql, string.format( table.insert(sql, string.format(
...@@ -339,7 +359,7 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -339,7 +359,7 @@ local function sdt_sql(tk_biz, tk_control, total_time)
os.date("%Y%m%d", total_time-24*60*60), os.date("%Y%m%d", total_time-24*60*60),
tostring(shop.shop_map_id)) tostring(shop.shop_map_id))
) )
-- ngx.log(ngx.NOTICE, sql[18]) -- log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql[18])
-- GET bank summary -- GET bank summary
table.insert(sql, string.format( table.insert(sql, string.format(
...@@ -351,7 +371,7 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -351,7 +371,7 @@ local function sdt_sql(tk_biz, tk_control, total_time)
os.date("%Y%m%d", total_time-24*60*60), os.date("%Y%m%d", total_time-24*60*60),
tostring(shop.shop_map_id)) tostring(shop.shop_map_id))
) )
-- ngx.log(ngx.NOTICE, sql[19]) -- log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql[19])
-- GET vipcard summary -- GET vipcard summary
table.insert(sql, string.format( table.insert(sql, string.format(
...@@ -363,7 +383,7 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -363,7 +383,7 @@ local function sdt_sql(tk_biz, tk_control, total_time)
os.date("%Y%m%d", total_time-24*60*60), os.date("%Y%m%d", total_time-24*60*60),
tostring(shop.shop_map_id)) tostring(shop.shop_map_id))
) )
-- ngx.log(ngx.NOTICE, sql[20]) -- log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql[20])
-- GET othpay summary -- GET othpay summary
table.insert(sql, string.format( table.insert(sql, string.format(
...@@ -375,11 +395,11 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -375,11 +395,11 @@ local function sdt_sql(tk_biz, tk_control, total_time)
os.date("%Y%m%d", total_time-24*60*60), os.date("%Y%m%d", total_time-24*60*60),
tostring(shop.shop_map_id)) tostring(shop.shop_map_id))
) )
-- ngx.log(ngx.NOTICE, sql[21]) -- log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. sql[21])
res, err, errcode, sqlstate = db_biz:query(table.concat(sql)) res, err, errcode, sqlstate = db_biz:query(table.concat(sql))
if not res then if not res then
ngx.log(ngx.ERR, "failed to query:", json.encode(tk_biz), ":", err, ":", errcode, ":", sqlstate, ".") log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "ERR: " .. "failed to query:", json.encode(tk_biz), ":", err, ":", errcode, ":", sqlstate, ".")
return return
end end
...@@ -396,8 +416,8 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -396,8 +416,8 @@ local function sdt_sql(tk_biz, tk_control, total_time)
-- repeat -- repeat
res, err, errcode, sqlstate = db_biz:read_result() res, err, errcode, sqlstate = db_biz:read_result()
if not res then if not res then
ngx.log(ngx.ERR, "bad sql: ", sql[j]) log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "ERR: " .. "bad sql: ", sql[j])
ngx.log(ngx.ERR, "bad result #", j, ": ", err, ": ", errcode, ": ", sqlstate, ".") log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "ERR: " .. "bad result #", j, ": ", err, ": ", errcode, ": ", sqlstate, ".")
return ngx.exit(500) return ngx.exit(500)
end end
...@@ -409,12 +429,12 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -409,12 +429,12 @@ local function sdt_sql(tk_biz, tk_control, total_time)
end end
-- until(err ~= "again") -- until(err ~= "again")
end end
ngx.log(ngx.NOTICE, log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " ..
string.format("================ TK_SDT [%s - %s] ==========", string.format("================ TK_SDT [%s - %s] ==========",
tostring(shop['shop_map_id']), tostring(shop['shop_map_id']),
os.date("%Y%m%d", total_time))) os.date("%Y%m%d", total_time)))
for k, v in pairs(shop) do for k, v in pairs(shop) do
ngx.log(ngx.NOTICE, string.format("%-32s : %s", k, tostring(v))) log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. string.format("%-32s : %s", k, tostring(v)))
end end
...@@ -513,6 +533,15 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -513,6 +533,15 @@ local function sdt_sql(tk_biz, tk_control, total_time)
shop['t_order_novip_amt'] = shop['t_order_amt'] - shop['t_order_vip_amt'] shop['t_order_novip_amt'] = shop['t_order_amt'] - shop['t_order_vip_amt']
shop['t_order_novip_cnt'] = shop['t_order_cnt'] - shop['t_order_vip_cnt'] shop['t_order_novip_cnt'] = shop['t_order_cnt'] - shop['t_order_vip_cnt']
shop['sku_turn_t'] = 0
shop['benefit_amt'] = 0
shop['rank_benefit'] = 0
shop['rank_sale_cnt'] = 0
shop['rank_sale_amt'] = 0
shop['rank_sale_profit'] = 0
shop['rank_sale_profit_per'] = 0
if shop['t_order_amt'] == 0 or shop['t_order_cnt'] == 0 then if shop['t_order_amt'] == 0 or shop['t_order_cnt'] == 0 then
shop['t_order_avg_price'] = 0 shop['t_order_avg_price'] = 0
...@@ -532,18 +561,18 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -532,18 +561,18 @@ local function sdt_sql(tk_biz, tk_control, total_time)
-- INSERT INTO tk_biz database -- INSERT INTO tk_biz database
res, err, errcode, sqlstate = db_biz:query(str) res, err, errcode, sqlstate = db_biz:query(str)
ngx.log(ngx.NOTICE, str) log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. str)
if not res then if not res then
ngx.log(ngx.ERR, "failed to query:", json.encode(tk_biz), log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "ERR: " .. "failed to query:", json.encode(tk_biz),
":", err, ":", errcode, ":", sqlstate, ".") ":", err, ":", errcode, ":", sqlstate, ".")
return return
end end
-- INSERT INTO tk_control database -- INSERT INTO tk_control database
res, err, errcode, sqlstate = db_control:query(str) res, err, errcode, sqlstate = db_control:query(str)
ngx.log(ngx.NOTICE, str) log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: " .. str)
if not res then if not res then
ngx.log(ngx.ERR, "failed to query:", json.encode(tk_control), log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "ERR: " .. "failed to query:", json.encode(tk_control),
":", err, ":", errcode, ":", sqlstate, ".") ":", err, ":", errcode, ":", sqlstate, ".")
-- return -- return
end end
...@@ -553,59 +582,20 @@ local function sdt_sql(tk_biz, tk_control, total_time) ...@@ -553,59 +582,20 @@ local function sdt_sql(tk_biz, tk_control, total_time)
break break
end end
end end
zx_base:close_db(db_biz)
zx_base:close_db(db_control)
ngx.log(ngx.NOTICE, "-----------------------> END")
end
-- 异步运行
-- @param table tk_biz 节点机数据库对象
-- table tk_control 总控机数据库对象
-- table tk_database 数据同步数据库名
-- @return nil
local function async(tk_biz, tk_control, total_time)
local co = coroutine.wrap(
function()
sdt_sql(tk_biz, tk_control, total_time)
end
)
table.insert(tasks, co)
end
-- 协程调度
-- @param nil
-- @return nil
local function dispatch()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks, i)
else
i = i + 1
end
end
end end
-- 跑批业务处理 -- 跑批业务处理
-- @param nil -- @param nil
-- @return nil -- @return nil
function TK_SDT.run(self, total_time) function TK_SDT.run(self, total_time)
-- local dbs_st = zx_base:db_read(db_conf)
for i = 1, #dbs_st.tk_biz do for i = 1, #dbs_st.tk_biz do
-- async(dbs_st.tk_biz[i], dbs_st.tk_control, total_time) local tk_biz = dbs_st.tk_biz[i]
sdt_sql(dbs_st.tk_biz[i], dbs_st.tk_control, total_time) local tk_control = dbs_st.tk_control
if BEGIN(tk_biz, tk_control) then
sdt_sql(tk_biz, tk_controltotal_time)
end
END()
end end
-- dispatch()
end end
return TK_SDT return TK_SDT
...@@ -7,33 +7,31 @@ local mysql = loadmod("resty.mysql") ...@@ -7,33 +7,31 @@ local mysql = loadmod("resty.mysql")
local zx_base = loadmod("zx_base") local zx_base = loadmod("zx_base")
local json = loadmod('cjson') local json = loadmod('cjson')
local db_conf = "tk7_dbs.json" local db_conf = "tk7_dbs.json"
local log_lua = loadmod('log')
-- 协程组 local db_biz
local tasks = {} local db_control
-- 初始化数据库连接
local function EOF()
end
-- 表信息
-- @param table tk_biz 节点机数据库对象 -- @param table tk_biz 节点机数据库对象
-- table tk_control 总控机数据库对象 -- table tk_control 总控机数据库对象
-- table tk_table 数据同步表名 -- @return true/false
-- @return nil local function BEGIN(tk_biz, tk_control)
local function sync_sql(tk_biz, tk_control, tk_table) log_lua:BEGIN("tk_sync")
ngx.log(ngx.NOTICE, "-----------------------> START") local err
-- 新建mysql连接 -- 新建mysql连接
local db, err = mysql:new() db_biz, err = mysql:new()
if not db then if not db_biz then
ngx.log(ngx.ERR, "failed to instantiate mysql: ", err) log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "ERR: " .. "failed to instantiate mysql: ", err)
return false return false
end end
db:set_timeout(1000) -- 1 sec
-- 连接节点机MySQL -- 超时设置
local ok, err, errcode, sqlstate = db:connect{ db_biz:set_timeout(1000)
-- 连接tk_biz
local ok, err, errcode, sqlstate = db_biz:connect{
host = zx_base:_get_addr(tk_biz['host']), host = zx_base:_get_addr(tk_biz['host']),
port = tk_biz['port'], port = tk_biz['port'],
database = tk_biz['database'], database = tk_biz['database'],
...@@ -43,39 +41,22 @@ local function sync_sql(tk_biz, tk_control, tk_table) ...@@ -43,39 +41,22 @@ local function sync_sql(tk_biz, tk_control, tk_table)
charset = 'utf8' charset = 'utf8'
} }
if not ok then if not ok then
ngx.log(ngx.ERR, json.encode(tk_biz), ":", err, errcode, sqlstate) log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "ERR: " .. json.encode(tk_biz), ":", err, errcode, sqlstate)
return return false
end end
-- 新建mysql连接
-- 查询节点机 db_control, err = mysql:new()
local sql = string.format([[SELECT * FROM %s where update_time > %s]], if not db_control then
tk_table, log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "ERR: " .. "failed to instantiate mysql: ", err)
os.date("%Y%m%d000000", os.time()-24*60*60) return false
)
ngx.log(ngx.NOTICE, sql)
local syncs
syncs, err, errcode, sqlstate = db:query(sql)
if not syncs then
ngx.log(ngx.ERR, "failed to query:", json.encode(tk_biz), ":",
err, ":", errcode, ":", sqlstate, ".")
return
end
if #syncs == 0 then
return "00", "SUCCESS"
end end
-- 超时设置
db_control:set_timeout(1000)
-- 重置MySQL连接 -- 连接tk_control
zx_base:close_db(db) local ok, err, errcode, sqlstate = db_control:connect{
db, err = mysql:new()
if not db then
ngx.log(ngx.ERR, "failed to instantiate mysql: ", err)
return
end
db:set_timeout(1000)
-- 连接总控机MySQL
ok, err, errcode, sqlstate = db:connect{
host = zx_base:_get_addr(tk_control['host']), host = zx_base:_get_addr(tk_control['host']),
port = tk_control['port'], port = tk_control['port'],
database = tk_control['database'], database = tk_control['database'],
...@@ -85,63 +66,58 @@ local function sync_sql(tk_biz, tk_control, tk_table) ...@@ -85,63 +66,58 @@ local function sync_sql(tk_biz, tk_control, tk_table)
charset = 'utf8' charset = 'utf8'
} }
if not ok then if not ok then
ngx.log(ngx.ERR, json.encode(tk_control), ":", err, errcode, sqlstate) log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "ERR: " .. json.encode(tk_control), ":", err, errcode, sqlstate)
return return false
end
-- 插入总控机
for i = 1, #syncs do
local sync = syncs[i]
local sql_result = zx_base:sql_concate("duplicate", tk_table, sync)
ngx.log(ngx.NOTICE, sql_result)
ok, err, errcode, sqlstate = db:query(sql_result)
if not ok then
ngx.log(ngx.ERR, "failed to query:", json.encode(tk_control), ":",
err, ":", errcode, ":", sqlstate, ".")
end
end end
ngx.log(ngx.NOTICE, "-----------------------> END") return true;
end end
-- 清理连接句柄
-- @param nil
-- @return nil
local function END()
zx_base:close_db(db_biz)
zx_base:close_db(db_control)
log_lua:END("tk_sync")
end
-- 异步运行 -- 表信息
-- @param table tk_biz 节点机数据库对象 -- @param table tk_biz 节点机数据库对象
-- table tk_control 总控机数据库对象 -- table tk_control 总控机数据库对象
-- table tk_table 数据同步表名 -- table tk_table 数据同步表名
-- @return nil -- @return nil
local function async(tk_biz, tk_control, tk_table) local function sync_sql(tk_biz, tk_control, tk_table)
local co = coroutine.wrap(
function()
sync_sql(tk_biz, tk_control, tk_table)
end
)
table.insert(tasks, co)
end
-- 协程调度 -- 查询节点机
-- @param nil local sql = string.format([[SELECT * FROM %s where update_time > %s]],
-- @return nil tk_table,
local function dispatch() os.date("%Y%m%d000000", os.time()-24*60*60)
local i = 1 )
while true do log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: ", sql)
if tasks[i] == nil then local syncs, err, errcode, sqlstate = db_biz:query(sql)
if tasks[1] == nil then if not syncs then
break log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "ERR: ", "failed to query:", json.encode(tk_biz), ":",
err, ":", errcode, ":", sqlstate, ".")
return
end end
i = 1 if #syncs == 0 then
return "00", "SUCCESS"
end end
local res = tasks[i]() -- 插入总控机
if not res then for i = 1, #syncs do
table.remove(tasks, i) local sync = syncs[i]
else local sql_result = zx_base:sql_concate("duplicate", tk_table, sync)
i = i + 1 log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: ", sql_result)
local ok, err, errcode, sqlstate = db_control:query(sql_result)
if not ok then
log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "ERR: ", "failed to query:", json.encode(tk_control), ":",
err, ":", errcode, ":", sqlstate, ".")
end end
end end
end end
...@@ -149,12 +125,15 @@ end ...@@ -149,12 +125,15 @@ end
-- @param nil -- @param nil
-- @return nil -- @return nil
function TK_SYNC.run() function TK_SYNC.run()
-- local dbs_st = zx_base:db_read(db_conf)
for i = 1, #dbs_st.tk_biz do for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i], dbs_st.tk_control, "tk_shop") local tk_biz = dbs_st.tk_biz[i]
async(dbs_st.tk_biz[i], dbs_st.tk_control, "tk_casher") local tk_control = dbs_st.tk_control
if BEGIN(tk_biz, tk_control) then
sync_sql(tk_biz, tk_control, "tk_shop")
sync_sql(tk_biz, tk_control, "tk_casher")
end
END()
end end
dispatch()
end end
...@@ -191,7 +170,10 @@ local function tk_sync() ...@@ -191,7 +170,10 @@ local function tk_sync()
else else
-- async(dbs_st.tk_biz[i], dbs_st.tk_control, data.dbTable) -- async(dbs_st.tk_biz[i], dbs_st.tk_control, data.dbTable)
if BEGIN(dbs_st.tk_biz[i], dbs_st.tk_control) then
sync_sql(dbs_st.tk_biz[i], dbs_st.tk_control, data.dbTable) sync_sql(dbs_st.tk_biz[i], dbs_st.tk_control, data.dbTable)
end
END()
code = "00" code = "00"
mesg = "SUCCESS" mesg = "SUCCESS"
...@@ -212,9 +194,12 @@ end ...@@ -212,9 +194,12 @@ end
function TK_SYNC.sync() function TK_SYNC.sync()
local code, mesg = tk_sync() local code, mesg = tk_sync()
-- http eof
ngx.say(json.encode({mesgRetCode=code, mesgRetDesc=mesg})) ngx.say(json.encode({mesgRetCode=code, mesgRetDesc=mesg}))
ngx.eof() ngx.eof()
ngx.log(ngx.NOTICE, ngx.var.request_body)
-- log eof
log_lua:log(log_lua:__FILE__() .. ":" .. log_lua:__LINE__(), "NOTICE: ", ngx.var.request_body)
end end
......
...@@ -2,24 +2,6 @@ local ZX_BASE = {__index=_G} ...@@ -2,24 +2,6 @@ local ZX_BASE = {__index=_G}
local json = loadmod ('cjson') local json = loadmod ('cjson')
function ZX_BASE.__FILE__ () return debug.getinfo(2,'S').source end
function ZX_BASE.__LINE__ () return debug.getinfo(2,'l').currentline end
function ZX_BASE.log(self, fname, ...)
local fpath = string.format("%s/%s_%s.log", ngx.var.logs_path, tostring(fname), os.date("%Y%m%d"))
local f = io.open(fpath, "a+")
local str = ""
for i=1, select("#", ...) do
local tmp= tostring(select(i, ...))
str = string.format("%s %s", str, tmp)
end
f:write(os.date("<%Y-%m-%d %H:%M:%S> "))
f:write(str)
f:write("\n")
f:close()
end
function ZX_BASE.db_read(self, dbpath) function ZX_BASE.db_read(self, dbpath)
local f = io.open(ngx.var.conf_path.."/"..dbpath, "r") local f = io.open(ngx.var.conf_path.."/"..dbpath, "r")
local tmp = f:read("a*") local tmp = f:read("a*")
......
...@@ -83,7 +83,6 @@ function _G._read_dns_servers_from_resolv_file(self) ...@@ -83,7 +83,6 @@ function _G._read_dns_servers_from_resolv_file(self)
end end
end end
-- _G.fpath = string.format("/var/log/www/S_%s.log", os.date("%Y%m%d")) -- _G.fpath = string.format("/var/log/www/S_%s.log", os.date("%Y%m%d"))
-- _G.f = io.open(_G.fpath, "a+") -- _G.f = io.open(_G.fpath, "a+")
...@@ -99,19 +98,25 @@ _G.dbs_st = { ...@@ -99,19 +98,25 @@ _G.dbs_st = {
}, },
tk_control = { tk_control = {
host = "rm-bp1u9rp1p216nb15f.mysql.rds.aliyuncs.com", host = "rm-bp1u9rp1p216nb15f.mysql.rds.aliyuncs.com",
port = 3306,
database = "tk_control",
user = "tk_control_user", user = "tk_control_user",
password = "KJjsdn%zx3@jr#y57dhh20drYT!z", password = "KJjsdn%zx3@jr#y57dhh20drYT!z",
-- host = "192.168.254.10",
-- user = "root",
-- password = "control_pwd",
port = 3306,
database = "tk_control",
timeout = 30 timeout = 30
}, },
tk_biz = { tk_biz = {
{ {
host = "rm-bp1rob0a7ry7zmc0n.mysql.rds.aliyuncs.com", host = "rm-bp1rob0a7ry7zmc0n.mysql.rds.aliyuncs.com",
port = 3306,
database = "tk_biz",
user = "tk_node_user", user = "tk_node_user",
password = "KJjsnc%gs5@jr#y789sajkshdrYT!z", password = "KJjsnc%gs5@jr#y789sajkshdrYT!z",
-- host = "192.168.254.20",
-- user = "root",
-- password = "node_pwd",
port = 3306,
database = "tk_biz",
timeout = 30 timeout = 30
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment