acq_vendor.cpp 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. #include "acq_vendor.h"
  2. #include <communications/communication.h>
  3. #include <json/value.h>
  4. #include <settings/config_parser.h>
  5. #include "load_json_file.h"
  6. #include "export_yaml_file.h"
  7. static vendor::MyOpt myOpts_[] = {
  8. {"help", no_argument, 'h', "[options] [documents] [IPaddress][:port]..."},
  9. {"auth", required_argument, 0, "User and role configuration"},
  10. {"debug", no_argument, 0, "Run in debug mode"},
  11. {"load-json", required_argument, 0, "load devices from json file"},
  12. {"export-yaml", required_argument, 0, "exports devices to yaml file"},
  13. {"log", required_argument, 0,
  14. "logFile:level, Log to file file at verbosity level"},
  15. {"route", required_argument, 0, "route file"},
  16. {"port", required_argument, 0, "set port, default: 8880"},
  17. {"delay", required_argument, 0, "delay secs to quit"},
  18. {"document", required_argument, 0, "document"},
  19. {"verbose", required_argument, 0, "Same as --log stderr:2"},
  20. {"version", no_argument, 0, "version information"}};
  21. namespace iot_acq {
  22. AcqVendor::AcqVendor()
  23. : acqTask_(new AcqTask())
  24. , dataAcqTask_(std::make_shared<data_acq::DataAcqTask>("leo-acq"))
  25. {
  26. rpcListener_ = {// onUpdateDevices
  27. .onUpdateDevices = [&](const std::vector<dbms::Device_t> &) {},
  28. // onUpdateChannels
  29. .onUpdateChannels =
  30. [&](const std::vector<dbms::DeviceChannel_t> &) {
  31. },
  32. // onUpdateFile
  33. .onUpdateFile =
  34. [&](const std::string &filePath) {
  35. LoadJsonFile loadJson;
  36. Json::Value root;
  37. loadJson.LoadJson(filePath, root);
  38. loadJson.ParseJson(root);
  39. SetEvent(eventStart_);
  40. },
  41. // onDeviceWrite
  42. .onDeviceWrite =
  43. [&](const std::string &deviceId,
  44. const std::vector<dbms::AcqItem_t> &items) -> dbms::AcqData_t {
  45. return dataAcqTask_->OnDeviceCommand(deviceId, items);
  46. },
  47. // onDeviceRead
  48. .onDeviceRead =
  49. [&](const std::string &deviceId,
  50. const std::vector<dbms::AcqItem_t> &items) -> dbms::AcqData_t {
  51. return dataAcqTask_->OnDeviceCommand(deviceId, items);
  52. },
  53. .onDeviceCommand =
  54. [&](int32_t channelId,
  55. const std::vector<uint8_t> &cmds) -> std::vector<uint8_t> {
  56. return dataAcqTask_->OnDeviceCommand(channelId, cmds);
  57. }};
  58. }
  59. AcqVendor::~AcqVendor() {}
  60. int AcqVendor::ParseCmdline(const std::string &optname, const std::string &optarg)
  61. {
  62. if (optname == "version") {
  63. printf(DAS_VERSION);
  64. exit(0);
  65. } else if (optname == "load-json") {
  66. LoadJsonFile loadJson;
  67. Json::Value root;
  68. loadJson.LoadJson(optarg, root);
  69. loadJson.ParseJson(root);
  70. exit(0);
  71. } else if (optname == "export-yaml") {
  72. ExportYamlFile yamlFile;
  73. yamlFile.ExportYaml(optarg);
  74. exit(0);
  75. }
  76. return 0;
  77. }
  78. vendor::MyOpt *AcqVendor::GetOpts()
  79. {
  80. return myOpts_;
  81. }
  82. int AcqVendor::GetOptSize()
  83. {
  84. return sizeof(myOpts_) / sizeof(myOpts_[0]);
  85. }
  86. void sigHandler(int sig)
  87. {
  88. HTELINK_LOG_DEBUG("sigHandler, %d", sig);
  89. }
  90. int AcqVendor::Run()
  91. {
  92. std::vector<settings::ConfigParser::DataCenter> dataCenters = configParser_.GetDataCenter();
  93. for(settings::ConfigParser::DataCenter &dc: dataCenters) {
  94. HTELINK_LOG_INFO("add rpc %s, yunId = %d", dc.name.c_str(), dc.yunID);
  95. std::shared_ptr<leoyun::YunRpc> rpc = leoyun::YunRpc::GetRpcInstance(dc.yunID);
  96. ASSERT(rpc != nullptr);
  97. rpc->SetDataCenter(dc);
  98. rpc->RegisterRpc(rpcListener_);
  99. acqTask_->PushRpc(dc.name, rpc);
  100. }
  101. AppendTimerEvent(10 * 60 * 1000, false, [&] {
  102. /* move files if disk full */
  103. utils::check_disk_log(VENDOR_LOG_PATH, configParser_.GetLogDuration());
  104. });
  105. timerCheck_ = AppendTimerEvent(5000, false, acqTask_);
  106. eventStart_ = AppendEvent([&]() {
  107. HTELINK_TRACE("Load Channels");
  108. dataAcqTask_->LoadChannels();
  109. dataAcqTask_->StartAll();
  110. });
  111. eventStop_ = AppendEvent([&]() {
  112. HTELINK_TRACE("Unload Channels");
  113. dataAcqTask_->StopAll();
  114. });
  115. dataAcqTask_->LoadProtocol(VENDOR_CONFIG_PATH + "/xcom_parser.xcom");
  116. dataAcqTask_->Subscribe(
  117. [&](const dbms::Device_t &device, const dbms::AcqData_t &acq) -> bool {
  118. bool result = true;
  119. acqTask_->ForEach([&](const std::shared_ptr<leoyun::YunRpc> &rpc) {
  120. if (!rpc->IsOnline()) {
  121. result = false;
  122. return;
  123. }
  124. if (!rpc->OnDataUpdate(device, acq)) {
  125. result = false;
  126. }
  127. });
  128. return result;
  129. });
  130. dataAcqTask_->SubscribeException(
  131. [&](const dbms::Device_t &device, const std::string &message) {
  132. acqTask_->ForEach([&](const std::shared_ptr<leoyun::YunRpc> &rpc) {
  133. rpc->OnException(device, message);
  134. });
  135. });
  136. SetEvent(eventStart_);
  137. return 0;
  138. }
  139. std::string AcqVendor::Name()
  140. {
  141. return "iot-acq";
  142. }
  143. void AcqVendor::Stop(int signal)
  144. {
  145. HTELINK_LOG_INFO("acq exit, signal = %d", signal);
  146. // StopEvent(timerCheck_);
  147. if (eventStop_ != nullptr) {
  148. SetEvent(eventStop_);
  149. }
  150. }
  151. } // namespace iot_acq