#include "acq_vendor.h" #include #include #include #include "load_json_file.h" #include "export_yaml_file.h" static vendor::MyOpt myOpts_[] = { {"help", no_argument, 'h', "[options] [documents] [IPaddress][:port]..."}, {"auth", required_argument, 0, "User and role configuration"}, {"debug", no_argument, 0, "Run in debug mode"}, {"load-json", required_argument, 0, "load devices from json file"}, {"export-yaml", required_argument, 0, "exports devices to yaml file"}, {"log", required_argument, 0, "logFile:level, Log to file file at verbosity level"}, {"route", required_argument, 0, "route file"}, {"port", required_argument, 0, "set port, default: 8880"}, {"delay", required_argument, 0, "delay secs to quit"}, {"document", required_argument, 0, "document"}, {"verbose", required_argument, 0, "Same as --log stderr:2"}, {"version", no_argument, 0, "version information"}}; namespace iot_acq { AcqVendor::AcqVendor() : acqTask_(new AcqTask()) , dataAcqTask_(std::make_shared("leo-acq")) { rpcListener_ = {// onUpdateDevices .onUpdateDevices = [&](const std::vector &) {}, // onUpdateChannels .onUpdateChannels = [&](const std::vector &) { }, // onUpdateFile .onUpdateFile = [&](const std::string &filePath) { LoadJsonFile loadJson; Json::Value root; loadJson.LoadJson(filePath, root); loadJson.ParseJson(root); SetEvent(eventStart_); }, // onDeviceWrite .onDeviceWrite = [&](const std::string &deviceId, const std::vector &items) -> dbms::AcqData_t { return dataAcqTask_->OnDeviceCommand(deviceId, items); }, // onDeviceRead .onDeviceRead = [&](const std::string &deviceId, const std::vector &items) -> dbms::AcqData_t { return dataAcqTask_->OnDeviceCommand(deviceId, items); }, .onDeviceCommand = [&](int32_t channelId, const std::vector &cmds) -> std::vector { return dataAcqTask_->OnDeviceCommand(channelId, cmds); }}; } AcqVendor::~AcqVendor() {} int AcqVendor::ParseCmdline(const std::string &optname, const std::string &optarg) { if (optname == "version") { printf(DAS_VERSION); exit(0); } else if (optname == "load-json") { LoadJsonFile loadJson; Json::Value root; loadJson.LoadJson(optarg, root); loadJson.ParseJson(root); exit(0); } else if (optname == "export-yaml") { ExportYamlFile yamlFile; yamlFile.ExportYaml(optarg); exit(0); } return 0; } vendor::MyOpt *AcqVendor::GetOpts() { return myOpts_; } int AcqVendor::GetOptSize() { return sizeof(myOpts_) / sizeof(myOpts_[0]); } void sigHandler(int sig) { HTELINK_LOG_DEBUG("sigHandler, %d", sig); } int AcqVendor::Run() { std::vector dataCenters = configParser_.GetDataCenter(); for(settings::ConfigParser::DataCenter &dc: dataCenters) { HTELINK_LOG_INFO("add rpc %s, yunId = %d", dc.name.c_str(), dc.yunID); std::shared_ptr rpc = leoyun::YunRpc::GetRpcInstance(dc.yunID); ASSERT(rpc != nullptr); rpc->SetDataCenter(dc); rpc->RegisterRpc(rpcListener_); acqTask_->PushRpc(dc.name, rpc); } AppendTimerEvent(10 * 60 * 1000, false, [&] { /* move files if disk full */ utils::check_disk_log(VENDOR_LOG_PATH, configParser_.GetLogDuration()); }); timerCheck_ = AppendTimerEvent(5000, false, acqTask_); eventStart_ = AppendEvent([&]() { HTELINK_TRACE("Load Channels"); dataAcqTask_->LoadChannels(); dataAcqTask_->StartAll(); }); eventStop_ = AppendEvent([&]() { HTELINK_TRACE("Unload Channels"); dataAcqTask_->StopAll(); }); dataAcqTask_->LoadProtocol(VENDOR_CONFIG_PATH + "/xcom_parser.xcom"); dataAcqTask_->Subscribe( [&](const dbms::Device_t &device, const dbms::AcqData_t &acq) -> bool { bool result = true; acqTask_->ForEach([&](const std::shared_ptr &rpc) { if (!rpc->IsOnline()) { result = false; return; } if (!rpc->OnDataUpdate(device, acq)) { result = false; } }); return result; }); dataAcqTask_->SubscribeException( [&](const dbms::Device_t &device, const std::string &message) { acqTask_->ForEach([&](const std::shared_ptr &rpc) { rpc->OnException(device, message); }); }); SetEvent(eventStart_); return 0; } std::string AcqVendor::Name() { return "iot-acq"; } void AcqVendor::Stop(int signal) { HTELINK_LOG_INFO("acq exit, signal = %d", signal); // StopEvent(timerCheck_); if (eventStop_ != nullptr) { SetEvent(eventStop_); } } } // namespace iot_acq