123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- #include "acq_vendor.h"
- #include <communications/communication.h>
- #include <json/value.h>
- #include <settings/config_parser.h>
- #include "load_json_file.h"
- #include "export_yaml_file.h"
- static vendor::MyOpt myOpts_[] = {
- {"help", no_argument, 'h', "[options] [documents] [IPaddress][:port]..."},
- {"auth", required_argument, 0, "User and role configuration"},
- {"debug", no_argument, 0, "Run in debug mode"},
- {"load-json", required_argument, 0, "load devices from json file"},
- {"export-yaml", required_argument, 0, "exports devices to yaml file"},
- {"log", required_argument, 0,
- "logFile:level, Log to file file at verbosity level"},
- {"route", required_argument, 0, "route file"},
- {"port", required_argument, 0, "set port, default: 8880"},
- {"delay", required_argument, 0, "delay secs to quit"},
- {"document", required_argument, 0, "document"},
- {"verbose", required_argument, 0, "Same as --log stderr:2"},
- {"version", no_argument, 0, "version information"}};
- namespace iot_acq {
- AcqVendor::AcqVendor()
- : acqTask_(new AcqTask())
- , dataAcqTask_(std::make_shared<data_acq::DataAcqTask>("leo-acq"))
- {
- rpcListener_ = {// onUpdateDevices
- .onUpdateDevices = [&](const std::vector<dbms::Device_t> &) {},
- // onUpdateChannels
- .onUpdateChannels =
- [&](const std::vector<dbms::DeviceChannel_t> &) {
- },
- // onUpdateFile
- .onUpdateFile =
- [&](const std::string &filePath) {
- LoadJsonFile loadJson;
- Json::Value root;
- loadJson.LoadJson(filePath, root);
- loadJson.ParseJson(root);
- SetEvent(eventStart_);
- },
- // onDeviceWrite
- .onDeviceWrite =
- [&](const std::string &deviceId,
- const std::vector<dbms::AcqItem_t> &items) -> dbms::AcqData_t {
- return dataAcqTask_->OnDeviceCommand(deviceId, items);
- },
- // onDeviceRead
- .onDeviceRead =
- [&](const std::string &deviceId,
- const std::vector<dbms::AcqItem_t> &items) -> dbms::AcqData_t {
- return dataAcqTask_->OnDeviceCommand(deviceId, items);
- },
- .onDeviceCommand =
- [&](int32_t channelId,
- const std::vector<uint8_t> &cmds) -> std::vector<uint8_t> {
- return dataAcqTask_->OnDeviceCommand(channelId, cmds);
- }};
- }
- AcqVendor::~AcqVendor() {}
- int AcqVendor::ParseCmdline(const std::string &optname, const std::string &optarg)
- {
- if (optname == "version") {
- printf(DAS_VERSION);
- exit(0);
- } else if (optname == "load-json") {
- LoadJsonFile loadJson;
- Json::Value root;
- loadJson.LoadJson(optarg, root);
- loadJson.ParseJson(root);
- exit(0);
- } else if (optname == "export-yaml") {
- ExportYamlFile yamlFile;
- yamlFile.ExportYaml(optarg);
- exit(0);
- }
- return 0;
- }
- vendor::MyOpt *AcqVendor::GetOpts()
- {
- return myOpts_;
- }
- int AcqVendor::GetOptSize()
- {
- return sizeof(myOpts_) / sizeof(myOpts_[0]);
- }
- void sigHandler(int sig)
- {
- HTELINK_LOG_DEBUG("sigHandler, %d", sig);
- }
- int AcqVendor::Run()
- {
- std::vector<settings::ConfigParser::DataCenter> dataCenters = configParser_.GetDataCenter();
- for(settings::ConfigParser::DataCenter &dc: dataCenters) {
- HTELINK_LOG_INFO("add rpc %s, yunId = %d", dc.name.c_str(), dc.yunID);
- std::shared_ptr<leoyun::YunRpc> rpc = leoyun::YunRpc::GetRpcInstance(dc.yunID);
- ASSERT(rpc != nullptr);
- rpc->SetDataCenter(dc);
- rpc->RegisterRpc(rpcListener_);
- acqTask_->PushRpc(dc.name, rpc);
- }
- AppendTimerEvent(10 * 60 * 1000, false, [&] {
- /* move files if disk full */
- utils::check_disk_log(VENDOR_LOG_PATH, configParser_.GetLogDuration());
- });
- timerCheck_ = AppendTimerEvent(5000, false, acqTask_);
- eventStart_ = AppendEvent([&]() {
- HTELINK_TRACE("Load Channels");
- dataAcqTask_->LoadChannels();
- dataAcqTask_->StartAll();
- });
- eventStop_ = AppendEvent([&]() {
- HTELINK_TRACE("Unload Channels");
- dataAcqTask_->StopAll();
- });
- dataAcqTask_->LoadProtocol(VENDOR_CONFIG_PATH + "/xcom_parser.xcom");
- dataAcqTask_->Subscribe(
- [&](const dbms::Device_t &device, const dbms::AcqData_t &acq) -> bool {
- bool result = true;
- acqTask_->ForEach([&](const std::shared_ptr<leoyun::YunRpc> &rpc) {
- if (!rpc->IsOnline()) {
- result = false;
- return;
- }
- if (!rpc->OnDataUpdate(device, acq)) {
- result = false;
- }
- });
- return result;
- });
- dataAcqTask_->SubscribeException(
- [&](const dbms::Device_t &device, const std::string &message) {
- acqTask_->ForEach([&](const std::shared_ptr<leoyun::YunRpc> &rpc) {
- rpc->OnException(device, message);
- });
- });
- SetEvent(eventStart_);
- return 0;
- }
- std::string AcqVendor::Name()
- {
- return "iot-acq";
- }
- void AcqVendor::Stop(int signal)
- {
- HTELINK_LOG_INFO("acq exit, signal = %d", signal);
- // StopEvent(timerCheck_);
- if (eventStop_ != nullptr) {
- SetEvent(eventStop_);
- }
- }
- } // namespace iot_acq
|