Commit 3be52cb0 authored by 周尚's avatar 周尚

优化数据同步接口 [完成]

parent 2b5305ba
No preview for this file type
No preview for this file type
No preview for this file type
......@@ -33,6 +33,7 @@ http {
lua_package_path "/Users/zhoush/openresty/nginx/lua/luabiz/?.lua;;";
init_by_lua_file "lua/luainit/init.lua";
# init_worker_by_lua_file "luabiz/settle/settle.lua";
......@@ -48,7 +49,7 @@ http {
# ssl_ciphers HIGH:!aNULL:!MD5;
# ssl_prefer_server_ciphers on;
set $server_dir /Users/zhoush/openresty/;
set $server_dir /Users/zhoush/openresty;
# set $lua_dir lua/;
set $biz_dir lua/luabiz;
set $logs_path $server_dir/nginx/logs;
......@@ -78,6 +79,13 @@ http {
}
}
location /table_sync {
content_by_lua_block {
local tk_sync = loadmod("tk_sync")
tk_sync:sync()
}
}
location /echo {
content_by_lua_block {
......
......@@ -150,6 +150,11 @@ local function main()
local str = ""
local resp = {}
if ngx.req.get_method() ~= "POST" then
return sync_err_tell(resp, "03", "Invalid Method")
end
local req = json.decode(ngx.var.request_body)
ok, err, msg, str = sync_bizz_switch(req)
......
local _CASHER = {}
local MODULE="S" -- Settle
-- 加载模块
local mysql = loadmod("resty.mysql")
local zx_base = loadmod("zx_base")
local json = loadmod('cjson')
local db_conf = "tk7_dbs.json"
-- 协和组
local tasks = {}
local function EOF()
zx_base:log(MODULE, "-----------------------> END[TK_CASHER]")
ngx.eof()
end
-- 门店信息
-- @param table tk_biz 节点机数据库对象
-- table tk_control 总控机数据库对象
-- @return nil
local function casher_sql(tk_biz, tk_control)
zx_base:log(MODULE, "-----------------------> START[TK_CASHER]")
-- 新建mysql连接
local db, err = mysql:new()
if not db then
zx_base:log(MODULE, "[TK_CASHER]", "failed to instantiate mysql: ", err)
return false
end
-- 连接节点机MySQL
local ok, err, errcode, sqlstate = db:connect{
host = tk_biz['host'],
port = tk_biz['port'],
database = tk_biz['database'],
user = tk_biz['user'],
password = tk_biz['password'],
timeout = tk_biz['timeout'],
charset = 'utf8'
}
if not ok then
zx_base:log(MODULE, "[TK_CASHER]", "failed to connect:", json.encode(tk_biz), ":",
err, ":", errcode, sqlstate)
return
end
-- 查询节点机
local sql = string.format([[SELECT * FROM tk_casher where update_time > %s]],
-- os.date("%Y%m%d000000", os.time()-24*60*60)
"20180716000000"
)
local cashers
cashers, err, errcode, sqlstate = db:query(sql)
if not cashers then
zx_base:log(MODULE, "[TK_CASHER]", sql)
zx_base:log(MODULE, "[TK_CASHER]", "failed to query:", json.encode(tk_biz), ":",
err, ":", errcode, ":", sqlstate, ".")
return
end
if #cashers == 0 then
ngx.say("no cashers found")
ngx.eof()
return
end
-- 重置MySQL连接
zx_base:close_db(db)
db, err = mysql:new()
if not db then
zx_base:log(MODULE, "[TK_CASHER]", "failed to instantiate mysql: ", err)
return false
end
-- 连接总控机MySQL
ok, err, errcode, sqlstate = db:connect{
host = tk_control['host'],
port = tk_control['port'],
database = tk_control['database'],
user = tk_control['user'],
password = tk_control['password'],
timeout = tk_control['timeout'],
charset = 'utf8'
}
if not ok then
zx_base:log(MODULE, "[TK_CASHER]", "failed to connect:", json.encode(tk_biz), ":", errcode, sqlstate)
return
end
-- 插入总控机
for i = 1, #cashers do
local casher = cashers[i]
local sql_result = zx_base:sql_concate("duplicate", "tk_casher", casher)
ok, err, errcode, sqlstate = db:query(sql_result)
if not ok then
zx_base:log(MODULE, "[TK_CASHER]", sql_result)
zx_base:log(MODULE, "[TK_CASHER]", "failed to query:", json.encode(tk_control), ":",
err, ":", errcode, ":", sqlstate, ".")
end
end
ngx.say("SUCCESS")
EOF()
end
local function async(tk_biz, tk_control)
local co = coroutine.wrap(
function()
casher_sql(tk_biz, tk_control)
end
)
table.insert(tasks, co)
end
local function dispatch()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks, i)
else
i = i + 1
end
end
end
function _CASHER.run()
local dbs_st = zx_base:db_read(db_conf)
for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i], dbs_st.tk_control)
end
dispatch()
end
return _CASHER
local _SHOP = {}
local TK_SYNC = {}
local MODULE="S" -- Settle
......@@ -14,57 +14,58 @@ local tasks = {}
local function EOF()
zx_base:log(MODULE, "-----------------------> END[TK_SHOP]")
zx_base:log(MODULE, "-----------------------> END[TK_SYNC]")
ngx.eof()
end
-- 门店信息
-- 信息
-- @param table tk_biz 节点机数据库对象
-- table tk_control 总控机数据库对象
-- @return nil
local function shop_sql(tk_biz, tk_control)
zx_base:log(MODULE, "-----------------------> START[TK_SHOP]")
local function sync_sql(tk_biz, tk_control, tk_table, tk_database)
zx_base:log(MODULE, "-----------------------> START[TK_SYNC]")
-- 新建mysql连接
local db, err = mysql:new()
if not db then
zx_base:log(MODULE, "[TK_SHOP]", "failed to instantiate mysql: ", err)
zx_base:log(MODULE, "[TK_SYNC]", "failed to instantiate mysql: ", err)
return false
end
-- 连接节点机MySQL
local ok, err, errcode, sqlstate = db:connect{
host = tk_biz['host'],
port = tk_biz['port'],
database = tk_biz['database'],
database = tk_database,
user = tk_biz['user'],
password = tk_biz['password'],
timeout = tk_biz['timeout'],
charset = 'utf8'
}
if not ok then
zx_base:log(MODULE, "[TK_SHOP]", "failed to connect:", json.encode(tk_biz), ":",
zx_base:log(MODULE, "[TK_SYNC]", "failed to connect:", json.encode(tk_biz), ":",
err, ":", errcode, sqlstate)
return
end
-- 查询节点机
local sql = string.format([[SELECT * FROM tk_shop where update_time > %s]],
local sql = string.format([[SELECT * FROM %s where update_time > %s]],
-- os.date("%Y%m%d000000", os.time()-24*60*60)
tk_table,
"20180716000000"
)
local shops
shops, err, errcode, sqlstate = db:query(sql)
if not shops then
zx_base:log(MODULE, "[TK_SHOP]", sql)
zx_base:log(MODULE, "[TK_SHOP]", "failed to query:", json.encode(tk_biz), ":",
local syncs
syncs, err, errcode, sqlstate = db:query(sql)
if not syncs then
zx_base:log(MODULE, "[TK_SYNC]", sql)
zx_base:log(MODULE, "[TK_SYNC]", "failed to query:", json.encode(tk_biz), ":",
err, ":", errcode, ":", sqlstate, ".")
return
end
if #shops == 0 then
ngx.say("no shops found")
if #syncs == 0 then
ngx.say("no syncs found")
ngx.eof()
return
end
......@@ -75,7 +76,7 @@ local function shop_sql(tk_biz, tk_control)
zx_base:close_db(db)
db, err = mysql:new()
if not db then
zx_base:log(MODULE, "[TK_SHOP]", "failed to instantiate mysql: ", err)
zx_base:log(MODULE, "[TK_SYNC]", "failed to instantiate mysql: ", err)
return false
end
-- 连接总控机MySQL
......@@ -89,20 +90,20 @@ local function shop_sql(tk_biz, tk_control)
charset = 'utf8'
}
if not ok then
zx_base:log(MODULE, "[TK_SHOP]", "failed to connect:", json.encode(tk_biz), ":", errcode, sqlstate)
zx_base:log(MODULE, "[TK_SYNC]", "failed to connect:", json.encode(tk_biz), ":", errcode, sqlstate)
return
end
-- 插入总控机
for i = 1, #shops do
local shop = shops[i]
local sql_result = zx_base:sql_concate("duplicate", "tk_shop", shop)
for i = 1, #syncs do
local sync = syncs[i]
local sql_result = zx_base:sql_concate("duplicate", tk_table, sync)
ok, err, errcode, sqlstate = db:query(sql_result)
if not ok then
zx_base:log(MODULE, "[TK_SHOP]", sql_result)
zx_base:log(MODULE, "[TK_SHOP]", "failed to query:", json.encode(tk_control), ":",
zx_base:log(MODULE, "[TK_SYNC]", sql_result)
zx_base:log(MODULE, "[TK_SYNC]", "failed to query:", json.encode(tk_control), ":",
err, ":", errcode, ":", sqlstate, ".")
end
end
......@@ -112,10 +113,10 @@ local function shop_sql(tk_biz, tk_control)
end
local function async(tk_biz, tk_control)
local function async(tk_biz, tk_control, tk_database, tk_table)
local co = coroutine.wrap(
function()
shop_sql(tk_biz, tk_control)
sync_sql(tk_biz, tk_control, tk_database, tk_table)
end
)
table.insert(tasks, co)
......@@ -140,12 +141,46 @@ local function dispatch()
end
end
function _SHOP.run()
function TK_SYNC.run()
local dbs_st = zx_base:db_read(db_conf)
for i = 1, #dbs_st.tk_biz do
async(dbs_st.tk_biz[i], dbs_st.tk_control)
async(dbs_st.tk_biz[i], dbs_st.tk_control, dbs_st.tk_biz[i].database, "tk_shop")
async(dbs_st.tk_biz[i], dbs_st.tk_control, dbs_st.tk_biz[i].database, "tk_casher")
end
dispatch()
end
return _SHOP
function TK_SYNC.sync()
local dbs_st = zx_base:db_read(db_conf)
if ngx.req.get_method() ~= "POST" then
ngx.say(json.encode({mesgRetCode="A7", mesgRetDesc="invalid method"}))
else
local data = json.decode(ngx.var.request_body)
if data.mesgType ~= "table_sync" then
ngx.say(json.encode({mesgRetcode="N1", mesgRetDesc="mesgType invalid"}))
elseif data.dbHost == ngx.null then
ngx.say(json.encode({mesgRetCode="N0", mesgRetDesc="dbHost missing"}))
else
for i = 1, #dbs_st.tk_biz do
if dbs_st.tk_biz[i].host == data.dbHost then
if data.dbTable == ngx.null then
ngx.say(json.encode({mesgRetCode="N0", mesgRetDesc="dbTable missing"}))
else
async(dbs_st.tk_biz[i], dbs_st.tk_control, dbs_st.tk_biz[i].database, data.dbTable)
return ngx.say(json.encode({mesgRetCode="00", mesgRetDesc="SUCCESS"}))
end
end
end
ngx.say(json.encode({mesgRetCode="N1", mesgRetDesc="dbHost invalid"}))
end
end
ngx.eof()
end
return TK_SYNC
......@@ -10,7 +10,6 @@ local __loaded_mods = {}
-- @param string namespace 模块名
-- @return table 模块
function _G.loadmod(namespace)
package.path = package.path ..string.format(";nginx/%s/?.lua;;", ngx.var.biz_dir)
-- 查找系统模块
local module = __loaded_mods[namespace]
......@@ -19,7 +18,8 @@ function _G.loadmod(namespace)
end
-- 查找项目模块
local p_namespace = ngx.var.biz_dir .. '.' .. namespace
-- local p_namespace = ngx.var.biz_dir .. '.' .. namespace
local p_namespace = namespace
local p_module = __loaded_mods[p_namespace]
if p_module then
return p_module
......
......@@ -74,7 +74,7 @@ After all clients are serviced, old worker processes are shut down.
Let’s illustrate this by example.
Imagine that nginx is run on FreeBSD 4.x and the command
Imagine that nginx is run on FreeBSD and the command
ps axw -o pid,ppid,user,%cpu,vsz,wchan,command | egrep '(nginx|PID)'
......@@ -264,8 +264,8 @@ the F<.oldbin> suffix from the file name with the process ID.
If upgrade was successful, then the old master process should be sent
the QUIT signal, and only new processes will stay:
If upgrade was successful, then the QUIT signal should be sent to
the old master process, and only new processes will stay:
PID PPID USER %CPU VSZ WCHAN COMMAND
......
......@@ -115,7 +115,7 @@ images and requests starting with "E<sol>downloadE<sol>".
error_page 404 /404.html;
location /404.html {
location = /404.html {
root /spool/www;
}
......
......@@ -1342,7 +1342,7 @@ Example:
error_page 404 /404.html;
location /404.html {
location = /404.html {
internal;
}
......@@ -2197,9 +2197,12 @@ C<reuseport>
this parameter (1.9.1) instructs to create an individual listening socket
for each worker process
(using the C<SO_REUSEPORT> socket option), allowing a kernel
(using the
C<SO_REUSEPORT> socket option on Linux 3.9+ and DragonFly BSD,
or C<SO_REUSEPORT_LB> on FreeBSDE<nbsp>12+), allowing a kernel
to distribute incoming connections between worker processes.
This currently works only on Linux 3.9+ and DragonFly BSD.
This currently works only on Linux 3.9+, DragonFly BSD,
and FreeBSD 12+ (1.15.1).
B<NOTE>
......
......@@ -1221,7 +1221,7 @@ server has returned an invalid response.
This allows handling application errors in nginx, for example:
location /php {
location /php/ {
fastcgi_pass backend:9000;
...
fastcgi_catch_stderr "PHP Fatal error";
......
......@@ -278,11 +278,11 @@ instead of C<$uri>
server {
...
location /hls {
location /hls/ {
hls;
hls_forward_args on;
alias /var/videos;
alias /var/videos/;
secure_link $arg_md5,$arg_expires;
secure_link_md5 "$secure_link_expires$hls_uri$remote_addr secret";
......
......@@ -20,29 +20,9 @@ a subset of the JavaScript language.
This module is not built by default, it should be compiled with
the njs module using the
C<--add-module> configuration parameter:
./configure --add-module=<value>path-to-njs</value>/nginx
The L<repository|http://hg.nginx.org/njs>
with the njs module can be cloned with the following command
(requires L<Mercurial|https://www.mercurial-scm.org> client):
hg clone http://hg.nginx.org/njs
This module can also be built as
L<dynamic|ngx_core_module>:
./configure --add-dynamic-module=<value>path-to-njs</value>/nginx
This module is not built by default.
Download and install instructions are available
L<here|njs_about>.
......@@ -53,6 +33,10 @@ L<dynamic|ngx_core_module>:
load_module modules/ngx_http_js_module.so;
...
http {
js_include http.js;
js_set $foo foo;
......@@ -66,9 +50,14 @@ L<dynamic|ngx_core_module>:
js_content baz;
}
location /summary {
location = /summary {
return 200 $summary;
}
location = /hello {
js_content hello;
}
}
}
......@@ -80,46 +69,50 @@ L<dynamic|ngx_core_module>:
The F<http.js> file:
function foo(req, res) {
req.log("hello from foo() handler");
function foo(r) {
r.log("hello from foo() handler");
return "foo";
}
function summary(req, res) {
function summary(r) {
var a, s, h;
s = "JS summary\n\n";
s += "Method: " + req.method + "\n";
s += "HTTP version: " + req.httpVersion + "\n";
s += "Host: " + req.headers.host + "\n";
s += "Remote Address: " + req.remoteAddress + "\n";
s += "URI: " + req.uri + "\n";
s += "Method: " + r.method + "\n";
s += "HTTP version: " + r.httpVersion + "\n";
s += "Host: " + r.headersIn.host + "\n";
s += "Remote Address: " + r.remoteAddress + "\n";
s += "URI: " + r.uri + "\n";
s += "Headers:\n";
for (h in req.headers) {
s += " header '" + h + "' is '" + req.headers[h] + "'\n";
for (h in r.headersIn) {
s += " header '" + h + "' is '" + r.headersIn[h] + "'\n";
}
s += "Args:\n";
for (a in req.args) {
s += " arg '" + a + "' is '" + req.args[a] + "'\n";
for (a in r.args) {
s += " arg '" + a + "' is '" + r.args[a] + "'\n";
}
return s;
}
function baz(req, res) {
res.headers.foo = 1234;
res.status = 200;
res.contentType = "text/plain; charset=utf-8";
res.contentLength = 15;
res.sendHeader();
res.send("nginx");
res.send("java");
res.send("script");
res.finish();
function baz(r) {
r.status = 200;
r.headersOut.foo = 1234;
r.headersOut['Content-Type'] = "text/plain; charset=utf-8";
r.headersOut['Content-Length'] = 15;
r.sendHeader();
r.send("nginx");
r.send("java");
r.send("script");
r.finish();
}
function hello(r) {
r.return(200, "Hello world!");
}
......@@ -196,13 +189,12 @@ Sets an njs function for the specified variable.
=head1 Request and Response Arguments
=head1 Request Argument
Each HTTP njs handler receives two arguments,
L<request|njs_api>
and L<response|njs_api>.
Each HTTP njs handler receives one argument, a request
L<object|njs_api>.
......
......@@ -30,7 +30,7 @@ Responses to mirror subrequests are ignored.
proxy_pass http://backend;
}
location /mirror {
location = /mirror {
internal;
proxy_pass http://test_backend$request_uri;
}
......@@ -112,7 +112,7 @@ directives will be disabled.
proxy_pass http://backend;
}
location /mirror {
location = /mirror {
internal;
proxy_pass http://log_backend;
proxy_pass_request_body off;
......
......@@ -606,7 +606,7 @@ L<ngx_http_core_module>
directive (1.13.10):
location /remote {
location /remote/ {
subrequest_output_buffer_size 64k;
...
}
......
......@@ -31,7 +31,7 @@ configuration parameter.
location /basic_status {
location = /basic_status {
stub_status;
}
......
......@@ -1111,6 +1111,46 @@ commercial subscription.
=head2 random
B<syntax:> random I<[C<two> [I<C<method>>]]>
B<context:> I<upstream>
This directive appeared in version 1.15.1.
Specifies that a group should use a load balancing method where a request
is passed to a randomly selected server, taking into account weights
of servers.
The optional C<two> parameter
instructs nginx to randomly select
L<two|https://homes.cs.washington.edu/~karlin/papers/balls.pdf>
servers and then choose a server
using the specified C<method>.
The default method is C<least_conn>
which passes a request to a server
with the least number of active connections.
=head2 sticky
......
......@@ -254,9 +254,12 @@ C<reuseport>
this parameter (1.9.1) instructs to create an individual listening socket
for each worker process
(using the C<SO_REUSEPORT> socket option), allowing a kernel
(using the
C<SO_REUSEPORT> socket option on Linux 3.9+ and DragonFly BSD,
or C<SO_REUSEPORT_LB> on FreeBSD 12+), allowing a kernel
to distribute incoming connections between worker processes.
This currently works only on Linux 3.9+ and DragonFly BSD.
This currently works only on Linux 3.9+, DragonFly BSD,
and FreeBSD 12+ (1.15.1).
B<NOTE>
......
......@@ -19,29 +19,9 @@ a subset of the JavaScript language.
This module is not built by default, it should be compiled with
the njs module using the
C<--add-module> configuration parameter:
./configure --add-module=<value>path-to-njs</value>/nginx
The L<repository|http://hg.nginx.org/njs>
with the njs module can be cloned with the following command
(requires L<Mercurial|https://www.mercurial-scm.org> client):
hg clone http://hg.nginx.org/njs
This module can also be built as
L<dynamic|ngx_core_module>:
./configure --add-dynamic-module=<value>path-to-njs</value>/nginx
This module is not built by default.
Download and install instructions are available
L<here|njs_about>.
......@@ -52,6 +32,9 @@ L<dynamic|ngx_core_module>:
load_module modules/ngx_stream_js_module.so;
...
stream {
js_include stream.js;
......
......@@ -81,6 +81,28 @@ Selecting an upstream based on protocol:
Selecting an upstream based on SSL protocol version:
map $ssl_preread_protocol $upstream {
"" ssh.example.com:22;
"TLSv1.2" new.example.com:443;
default tls.example.com:443;
}
# ssh and https on the same port
server {
listen 192.168.0.1:443;
proxy_pass $upstream;
ssl_preread on;
}
=head1 Directives
=head2 ssl_preread
......@@ -119,6 +141,15 @@ the L<preread|stream_processing> phase.
=item C<$ssl_preread_protocol>
the highest SSL protocol version supported by the client (1.15.2)
=item C<$ssl_preread_server_name>
......
......@@ -651,11 +651,15 @@ weighted round-robin balancing method.
If the C<connect> parameter is specified,
time to connect to the upstream server is used.
time to
connect
to the upstream server is used.
If the C<first_byte> parameter is specified,
time to receive the first byte of data is used.
time to receive the
first byte of data is used.
If the C<last_byte> is specified,
time to receive the last byte of data is used.
time to receive the
last byte of data is used.
If the C<inflight> parameter is specified (1.11.6),
incomplete connections are also taken into account.
......@@ -682,6 +686,46 @@ commercial subscription.
=head2 random
B<syntax:> random I<[C<two> [I<C<method>>]]>
B<context:> I<upstream>
This directive appeared in version 1.15.1.
Specifies that a group should use a load balancing method where a connection
is passed to a randomly selected server, taking into account weights
of servers.
The optional C<two> parameter
instructs nginx to randomly select
L<two|https://homes.cs.washington.edu/~karlin/papers/balls.pdf>
servers and then choose a server
using the specified C<method>.
The default method is C<least_conn>
which passes a connection to a server
with the least number of active connections.
=head1 Embedded Variables
......
This diff is collapsed.
......@@ -7,6 +7,236 @@
njs_changes - njs Changes
=head1 Changes with 0.2.2
Release Date:
19 June 2018
nginx modules:
=over
=item *
Change:
merged HTTP C<Response> and C<Reply>
into L<HTTP Request|njs_api>.
New members of C<Request>:
=over
=item *
C<req.status> (C<res.status>)
=item *
C<req.parent> (C<reply.parent>)
=item *
C<req.requestBody> (C<req.body>)
=item *
C<req.responseBody> (C<reply.body>)
=item *
C<req.headersIn> (C<req.headers>)
=item *
C<req.headersOut> (C<res.headers>)
=item *
C<req.sendHeader()> (C<res.sendHeader()>)
=item *
C<req.send()> (C<res.send()>)
=item *
C<req.finish()> (C<res.finish()>)
=item *
C<req.return()> (C<res.return()>)
=back
Deprecated members of C<Request>:
=over
=item *
C<req.body> (use C<req.requestBody>
or C<req.responseBody>)
=item *
C<req.headers> (use C<req.headersIn>
or C<req.headersOut>)
=item *
C<req.response>
=back
The deprecated properties will be removed in next releases.
=item *
Feature:
HTTP L<internalRedirect()|njs_api>
method.
=back
Core:
=over
=item *
Bugfix:
fixed heap-buffer-overflow in C<crypto.createHmac()>.
=back
=head1 Changes with 0.2.1
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment