vkflow
1. Фабрика Для создания потока достаточно вызвать фабрику, передав в качестве параметров правила фильтрации и сервисный ключ приложения ВКонтакте.
Пример: чтение и отображение сообщений
const vkflow = require('vkflow');
const stream = vkflow(
VK_SERVICE_KEY,
[ { value: 'вконтакте', tag: 'cyrillic' }
, { value: 'vk', tag: 'latin' }
]
);
stream.on('data', console.log);
Каждое событие 'data'
содержит одно сообщение потока в виде строки формата JSON. Сервисные сообщения игнорируются. Указанные правила фильтрации заменят собой те, что были заданы для этого потока ранее, если такие есть. При разрывах соединения, vkflow
автоматически выполнит переподключение. При возникновении ошибок, сработает событие 'error'
с соответствующим объектом ошибки.
Объект, который создаёт фабрика, имплементирует Readable Stream, поэтому для работы с ним можно использовать не только событийный подход, но и потоковый.
Пример: чтение и запись потока в файл с использованием Node Streams:
const { createWriteStream } = require('fs')
const vkflow = require('vkflow')
vkflow(VK_SERVICE_KEY, rules)
.pipe(createWriteStream('destination.dat'))
VKStreamingAPI
и VKWebSocket
2. Объекты Фабрика vkflow
позволяет легко решить задачу чтения потока с предзаданными правилами фильтрации, но если необходимо реализовать иную логику работы с VK Streaming API, то предусмотрены более низкоуровневые сущности VKStreamingAPI
и VKWebSocket
.
VKStreamingAPI
2.1 VKStreamingAPI
— предоставляет методы для HTTP взаимодействий с VK Streaming API: с помощью него можно выполнять авторизацию (получение токена и эндпоинта) и управлять правилами потока.
Поддерживаемые методы:
authWithToken
(serviceKey:String) → Promise
serviceKey
— сервисный ключ приложения ВКонтакте.
Выполняет авторизацию VK Streaming API и возвращает Promise, который резолвится с объектом вида { key: String, endpoint: String }
, где key это ключ доступа, а endpoint это URL для дальнейшей работы c API. Например:
{ endpoint: 'streaming.vk.com',
key: '49e4758265efca1bb0bfceec9a08272b5d015ba0' }
В рассмотренных далее методах endpoint и key — это параметры полученные с помощью
VKStreamingAPI.authWithToken
getRules
(endpoint:String, key:String) → Promise
Запрашивает список всех правил объявленных для потока и возвращает Promise, который резолвится с объектом вида:
{ code: 200,
rules:
[ { tag: 'candidate1', value: 'титов -егор' },
{ tag: 'candidate2', value: 'собчак' },
{ tag: 'candidate3', value: 'навальный' },
{ tag: 'candidate4', value: 'путин' },
{ tag: 'candidate5', value: 'жириновский' },
{ tag: 'candidate6', value: 'явлинский' },
{ tag: 'candidate7', value: 'грудинин' } ] }
postRule
(endpoint:String, key:String, rule:Object) → Promise
Добавляет правило в поток. Здесь rule
— это объект описывающий правило, например:
{ rule: { tag: 'candidate2', value: 'собчак' } }
deleteRule
(endpoint:String, key:String, ruleTag:Object) → Promise
Удаляет правило из потока. Здесь ruleTag
— это объект вида:
{ tag: 'candidate2' }
flushRules
(endpoint:String, key:String) → Promise
Удаляет все правила потока.
getSettings
(serviceKey:String) → Promise
Возвращает объект с единственным полем monthlylimit (string), которое содержит значение tier1-tier_6 или unlimited и соответствует установленному порогу для приложения.
getStats
(serviceKey:String, params:Object) → Promise
Позволяет получить статистику для подготовленных и доставленных событий Streaming API. params
— объект с требованиями к отчету, см. документацию: vk.com/dev/streaming.getStats.
getStem
(serviceKey:String, params:Object) → Promise
Лемматизирует слово, переданное в поле word
объекта params
.
VKWebSocket
2.2 VKWebSocket
— класс для чтения потока через websocket соединение к Streaming API ВКонтакте. Как и объект создаваемый фабрикой vkflow
, инстанс VKWebSocket
имплементирует Readable Stream, а значит чтение потока может осуществляться как через обработку события data
, так и с использованием pipe
.
Для инстанцирования VKWebSocket
необходимо указать адрес соединения и опциональный объект с параметрами подключения.
Пример: чтение потока с использованием VKWebSocket
const VKWebSocket = require('vkflow').VKWebSocket;
const { authWithToken } = require('vkflow').VKStreamingAPI;
(async () => {
const { endpoint, key } = await authWithToken(VK_SERVICE_KEY);
const socket = new VKWebSocket(
`wss://${endpoint}/stream?key=${key}`,
{ socket: { omitServiceMessages: false } }
)
socket.pipe(someWritableStream)
})()
Опциональные параметры подключения:
highWaterMark
(Number) — размер буфера сообщений. Default:32768
;socket.debug
(Boolean) — включение режима логирования (через console.debug()) Default:false
;socket.omitServiceMessages
(Boolean) — игнорировать или нет сервисные сообщения Default:true
;socket.reconnectInterval
(Number) — стартовая периодичность попыток переподключения обрыве соединения. Default:1e3
;socket.maxReconnectInterval
(Number) — максимальная периодичность попыток переподключения обрыве соединения. Default:3e4
;socket.reconnectDecay
(Number) — множитель паузы между последующими попытками подключения. Default:1.5
;socket.timeoutInterval
(Number) — время таймаута одной попытки подключения. Default:2e3
;socket.maxReconnectAttempts
(Number) — лимит количества попыток подключения. Default:null
(нет лимита попыток)
Пример: использование VKStreamingAPI
и VKWebSocket
const VKWebSocket = require('vkflow').VKWebSocket;
const { authWithToken, flushRules, postRule } = require('vkflow').VKStreamingAPI;
const rules = [
{ tag: 'candidate1', value: 'титов -егор' },
{ tag: 'candidate2', value: 'собчак' },
{ tag: 'candidate3', value: 'навальный' },
{ tag: 'candidate4', value: 'путин' },
{ tag: 'candidate5', value: 'жириновский' },
{ tag: 'candidate6', value: 'явлинский' },
{ tag: 'candidate7', value: 'грудинин' }
];
/**
* Выполним авторизацию, удалим старые правила из потока, создадим новые
* и установим websocket соединение для чтения потокa
*/
(async () => {
const { endpoint, key } = await authWithToken(VK_SERVICE_KEY);
await flushRules(endpoint, key);
for (let rule of rules)
await postRule(endpoint, key, { rule });
const socket = new VKWebSocket(
`wss://${endpoint}/stream?key=${key}`,
{ socket: { omitServiceMessages: false } }
)
socket.pipe(process.stdout)
})()