https://www.npmjs.com/package/multipipe
https://github.com/zoubin/streamify-your-node-program
http://www.xiedacon.com/2017/08/11/Node.js%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90-pipe%E5%AE%9E%E7%8E%B0/#Node-js%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90-pipe%E5%AE%9E%E7%8E%B0
-----------------------------------------------------------------------
const _ = require("underscore");
require("underscore_extension")(_);
const $path = require("path");
const $fs = require('fs');
const $zlib = require('zlib');
http();
function http() {
let server = require("http").createServer(async function (req, resp) {
resp.writeHead(200, {
'Content-Type': 'image/jpg',
'content-encoding': 'gzip'
});
let _path = $path.resolve(__dirname, './images/plane_1.jpg');
let reader = $fs.createReadStream(_path);
reader = reader.pipe($zlib.createGzip());
await pipe(reader, resp);
resp.end();
});
server.listen(8081);
}
function _error(res) {
res.writeHead(200, {
'Content-Type': 'text/html'
});
console.log('error...')
res.end('<h1>error</h1>');
}
function pipe(reader, writer, size) {
let def = _.deferred();
reader.on('readable', function () {
// 採用靜態模式
console.log('----------readable----------------');
write();
});
reader.on('data', function (chunk) {
// reader.read() 會進來此
console.log('on data(%s)', chunk.length);
});
writer.on('drain', function () {
console.log('drain..........');
let data;
// 讀取之未讀完的記憶體
if (!write(true)) {
return;
}
console.log('舊的讀取完');
reader.resume();
});
reader.on('end', function () {
console.log('end');
def.resolve();
});
//----------------------------
function write(noPause) {
console.log('....write')
while (true) {
console.log('read(%s)', (noPause ? 'flush' : 'ok'));
let data = reader.read(size);
if (data === null) {
console.log('read end');
break;
} else {
if (writer.write(data) === false) {
console.log('too much pause');
if (!noPause) {
console.log('pause...');
reader.pause();
}
return false;
}
}
}
return true;
}
//----------------------------
return def.promise();
}
2019年1月24日 星期四
2019年1月23日 星期三
2019年1月19日 星期六
node.js stream.pipe
Stream.prototype.pipe = function(dest, options) {
var source = this;
source.on('data', function (chunk) {
if (dest.writable) {
if (false === dest.write(chunk) && source.pause) {
source.pause();
}
}
});
dest.on('drain', function () {
if (source.readable && source.resume) {
source.resume();
}
});
dest.emit('pipe', source);
// Allow for unix-like usage: A.pipe(B).pipe(C)
return dest;
};
var source = this;
source.on('data', function (chunk) {
if (dest.writable) {
if (false === dest.write(chunk) && source.pause) {
source.pause();
}
}
});
dest.on('drain', function () {
if (source.readable && source.resume) {
source.resume();
}
});
dest.emit('pipe', source);
// Allow for unix-like usage: A.pipe(B).pipe(C)
return dest;
};
2019年1月18日 星期五
node.js 進程池
const cp = require('child_process')
function doWork(job, callback) {
// TODO worker 是 工作进程,下一步进行实现
let child = cp.fork('./worker')
// 用来跟踪回调的执行状态,防止出现多次回调调用
let cbTriggered = false;
child.once('error', function (err) {
if (!cbTriggered) {
callback(err)
cbTriggered = true;
}
// 异常错误时,杀死子进程
child.kill();
})
.once('exit', function (code, signal) {
if (!cbTriggered) {
callback(new Error('Child exited with code: ' + code));
}
})
.once('message', function (result) {
callback(null, result)
cbTriggered = true;
})
.send(job);
}
//--------------------------------------
// pooler.js
const cp = require('child_process');
// 取到cpu 数量
const cpus = require('os').cpus().length;
module.exports = function (workModule) {
// 当没有空闲的cpu时,用来存放任务队列
let awaiting = [];
// 存放准备就绪的工作进程
let readyPool = [];
// 存放现有的工作者进程的数量
let poolSize = 0;
return function doWork(job, callback) {
// 如果没有空闲的工作进程并且工作进程达到了限制,就把后续任务放入队列等待执行
if (!readyPool.length && poolSize > cpus) {
return awaiting.push([doWork, job, callback])
}
// 获取下一个可用的子进程或者fork一个新的进程并增加池子大小
let child = readyPool.length
? readyPool.shift()
: (poolSize++ , cp.fork(workModule));
let cbTriggered = false;
// 移除子进程上的所有监听,保证每一个子进程每次只有一个监听
child
.removeAllListeners()
.once('error', function (err) {
if (!cbTriggered) {
callback(err)
cbTriggered = true;
}
child.kill();
})
.once('exit', function (code, signal) {
if (!cbTriggered) {
callback(new Error('Child exited with code: ' + code));
}
poolSize--;
// 如果子进程由于某种原因退出,保证在readyPool中也被移除了
let childIdx = readyPool.indexOf(child);
if (childIdx > -1) {
readyPool.splice(childIdx, 1);
}
})
.once('message', function (result) {
callback(null, result)
cbTriggered = true;
// 把空闲的子进程再次放入readyPool,等待接下来的任务
readyPool.push(child);
if (awaiting.length) {
setImmediate.apply(null, awaiting.shift());
}
})
.send(job);
}
}
function doWork(job, callback) {
// TODO worker 是 工作进程,下一步进行实现
let child = cp.fork('./worker')
// 用来跟踪回调的执行状态,防止出现多次回调调用
let cbTriggered = false;
child.once('error', function (err) {
if (!cbTriggered) {
callback(err)
cbTriggered = true;
}
// 异常错误时,杀死子进程
child.kill();
})
.once('exit', function (code, signal) {
if (!cbTriggered) {
callback(new Error('Child exited with code: ' + code));
}
})
.once('message', function (result) {
callback(null, result)
cbTriggered = true;
})
.send(job);
}
//--------------------------------------
// pooler.js
const cp = require('child_process');
// 取到cpu 数量
const cpus = require('os').cpus().length;
module.exports = function (workModule) {
// 当没有空闲的cpu时,用来存放任务队列
let awaiting = [];
// 存放准备就绪的工作进程
let readyPool = [];
// 存放现有的工作者进程的数量
let poolSize = 0;
return function doWork(job, callback) {
// 如果没有空闲的工作进程并且工作进程达到了限制,就把后续任务放入队列等待执行
if (!readyPool.length && poolSize > cpus) {
return awaiting.push([doWork, job, callback])
}
// 获取下一个可用的子进程或者fork一个新的进程并增加池子大小
let child = readyPool.length
? readyPool.shift()
: (poolSize++ , cp.fork(workModule));
let cbTriggered = false;
// 移除子进程上的所有监听,保证每一个子进程每次只有一个监听
child
.removeAllListeners()
.once('error', function (err) {
if (!cbTriggered) {
callback(err)
cbTriggered = true;
}
child.kill();
})
.once('exit', function (code, signal) {
if (!cbTriggered) {
callback(new Error('Child exited with code: ' + code));
}
poolSize--;
// 如果子进程由于某种原因退出,保证在readyPool中也被移除了
let childIdx = readyPool.indexOf(child);
if (childIdx > -1) {
readyPool.splice(childIdx, 1);
}
})
.once('message', function (result) {
callback(null, result)
cbTriggered = true;
// 把空闲的子进程再次放入readyPool,等待接下来的任务
readyPool.push(child);
if (awaiting.length) {
setImmediate.apply(null, awaiting.shift());
}
})
.send(job);
}
}
2019年1月16日 星期三
node.js http 讀取 stream
function pstreamPipe(path, callback, code){
return (streamPipe(path, callback, code)[0]);
}
function streamPipe(path, callback, code) {
let def = _.deferred();
let stream = require("fs").createReadStream(path);
if (code) {
stream.setEncoding(code);
}
stream.on('data', function (data) {
callback(data);
});
stream.on('error', function (err) {
def.reject(err);
});
stream.on('end', function () {
def.resolve();
});
return [def.promise(), stream];
}
return (streamPipe(path, callback, code)[0]);
}
function streamPipe(path, callback, code) {
let def = _.deferred();
let stream = require("fs").createReadStream(path);
if (code) {
stream.setEncoding(code);
}
stream.on('data', function (data) {
callback(data);
});
stream.on('error', function (err) {
def.reject(err);
});
stream.on('end', function () {
def.resolve();
});
return [def.promise(), stream];
}
一個 java 泛型的例子
public interface X<T> {
public void setData(T data);
public T getData();
}
public class Y<T> implements X<T> {
public T data;
@Override
public void setData(T data) {
this.data = data;
}
public T getData() {
return this.data;
}
}
public class Done {
/**
* @param args the command line arguments
*/
public static void main(String[] args) {
X<String> x = new Y();
x.setData("hi");
out.println(x.getData());
}
}
public void setData(T data);
public T getData();
}
public class Y<T> implements X<T> {
public T data;
@Override
public void setData(T data) {
this.data = data;
}
public T getData() {
return this.data;
}
}
public class Done {
/**
* @param args the command line arguments
*/
public static void main(String[] args) {
X<String> x = new Y();
x.setData("hi");
out.println(x.getData());
}
}
swing 的組織想法
// 中間協調者
public class Observeable<T> {
// 要通知的組件
public ArrayList<Observe> elements;
public void emit(String eventName, T data){
}
}
public interface Observe<T> {
public void on(String eventName);
public void off(String eventName);
public void notify(String eventName, T data);
}
public class Element<T, E> implements Observe<E> {
// swing 元件
public T x;
@Override
public void on(String eventName) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
@Override
public void off(String eventName) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
@Override
public void notify(String eventName, E data) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
}
public class Observeable<T> {
// 要通知的組件
public ArrayList<Observe> elements;
public void emit(String eventName, T data){
}
}
public interface Observe<T> {
public void on(String eventName);
public void off(String eventName);
public void notify(String eventName, T data);
}
public class Element<T, E> implements Observe<E> {
// swing 元件
public T x;
@Override
public void on(String eventName) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
@Override
public void off(String eventName) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
@Override
public void notify(String eventName, E data) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
}
2019年1月15日 星期二
2019年1月8日 星期二
javascript eval() 常會錯誤難解的地方
let l = "aaa\"bbb\"aaa";
let command = "let x = \"" + l + "\";";
console.log(command); // >>> let x = "aaa"bbb"aaa";
eval(command);
//-------------------------------------------
// 解法
let l = "aaa\"bbb\"aaa"; (看作 aaa"bbb"aaa)
l = l.replace(/\"|\'/g, function(m){
return ('\\' + m);
});
(把 i 看作 aaa\"bbb\"aaa)
let command = "let x = \"" + l + "\";";
console.log(command); // >>> let x = "aaa"bbb"aaa";
eval(command);
//-------------------------------------------
// 解法
let l = "aaa\"bbb\"aaa"; (看作 aaa"bbb"aaa)
l = l.replace(/\"|\'/g, function(m){
return ('\\' + m);
});
(把 i 看作 aaa\"bbb\"aaa)
特殊字符\u2028導致的Javascript腳本異常
網上查詢得知,這個編碼為2028的字符為行分隔符,會被瀏覽器理解為換行,而在Javascript的字符串表達式中是不允許換行的,從而導致錯誤。
解決方法
把特殊字符轉義替換即可,代碼如下所示:
1
str = str.Replace("\u2028", "\\u2028");
替換後,用之前有問題的文章測試,加載正常,問題解決。
另外,Javascript中的特殊字符一共有13個,建議都進行轉義處理,如下:
Unicode 字符值 轉義序列 含義 類別
\u0008 \b Backspace
\u0009 \t Tab 空白
\u000A \n 換行符(換行) 行結束符
\u000B \v 垂直製表符 空白
\u000C \f 換頁 空白
\u000D \r 回車 行結束符
\u0022 \" 雙引號 (")
\u0027 \' 單引號 (')
\u005C \\ 反斜槓 (\)
\u00A0 不間斷空格 空白
\u2028 行分隔符 行結束符
\u2029 段落分隔符 行結束符
\uFEFF 字節順序標記 空白
解決方法
把特殊字符轉義替換即可,代碼如下所示:
1
str = str.Replace("\u2028", "\\u2028");
替換後,用之前有問題的文章測試,加載正常,問題解決。
另外,Javascript中的特殊字符一共有13個,建議都進行轉義處理,如下:
Unicode 字符值 轉義序列 含義 類別
\u0008 \b Backspace
\u0009 \t Tab 空白
\u000A \n 換行符(換行) 行結束符
\u000B \v 垂直製表符 空白
\u000C \f 換頁 空白
\u000D \r 回車 行結束符
\u0022 \" 雙引號 (")
\u0027 \' 單引號 (')
\u005C \\ 反斜槓 (\)
\u00A0 不間斷空格 空白
\u2028 行分隔符 行結束符
\u2029 段落分隔符 行結束符
\uFEFF 字節順序標記 空白
2019年1月5日 星期六
python list.sort
x = [{"age": 15}, {"age": 5}, {"age": 10}]
def sortFn(arg):
'''用 age 作為排序依據'''
return arg['age']
x.sort(key=sortFn)
print(x)
def sortFn(arg):
'''用 age 作為排序依據'''
return arg['age']
x.sort(key=sortFn)
print(x)
訂閱:
文章 (Atom)