Selaa lähdekoodia

back to bulk apply for status updates in BigTable

Grant Limberg 3 päivää sitten
vanhempi
commit
1d65c1e43e
1 muutettua tiedostoa jossa 17 lisäystä ja 56 poistoa
  1. 17 56
      nonfree/controller/BigTableStatusWriter.cpp

+ 17 - 56
nonfree/controller/BigTableStatusWriter.cpp

@@ -91,6 +91,7 @@ void BigTableStatusWriter::writePending()
 	}
 	fprintf(stderr, "Writing %zu pending status entries to BigTable\n", toWrite.size());
 
+	cbt::BulkMutation bulk;
 	for (const auto& entry : toWrite) {
 		std::string row_key = entry.network_id + "#" + entry.node_id;
 		cbt::SingleRowMutation m(row_key);
@@ -107,65 +108,25 @@ void BigTableStatusWriter::writePending()
 		}
 		int64_t ts = entry.last_seen;
 		m.emplace_back(cbt::SetCell(checkInColumnFamily, lastSeenColumn, std::move(ts)));
+		bulk.emplace_back(m);
+	}
 
-		try {
-			auto status = _table->Apply(std::move(m));
-			if (! status.ok()) {
-				fprintf(stderr, "Error writing to BigTable: %s\n", status.message().c_str());
-			}
-			else {
-				_pubsubWriter->publishStatusChange(
-					entry.target, entry.network_id, entry.node_id, entry.os, entry.arch, entry.version,
-					entry.last_seen);
-			}
-		}
-		catch (const std::exception& e) {
-			fprintf(stderr, "Exception writing to BigTable: %s\n", e.what());
-			span->SetAttribute("error", e.what());
-			span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
-			return;
+	fprintf(stderr, "Applying %zu mutations to BigTable\n", bulk.size());
+
+	try {
+		std::vector<cbt::FailedMutation> failures = _table->BulkApply(std::move(bulk));
+		fprintf(stderr, "BigTable write completed with %zu failures\n", failures.size());
+		for (auto const& r : failures) {
+			// Handle error (log it, retry, etc.)
+			std::cerr << "Error writing to BigTable: " << r.status() << "\n";
 		}
 	}
-	// cbt::BulkMutation bulk;
-	// for (const auto& entry : toWrite) {
-	// 	std::string row_key = entry.network_id + "#" + entry.node_id;
-	// 	cbt::SingleRowMutation m(row_key);
-	// 	m.emplace_back(cbt::SetCell(nodeInfoColumnFamily, osColumn, entry.os));
-	// 	m.emplace_back(cbt::SetCell(nodeInfoColumnFamily, archColumn, entry.arch));
-	// 	m.emplace_back(cbt::SetCell(nodeInfoColumnFamily, versionColumn, entry.version));
-	// 	char buf[64] = { 0 };
-	// 	std::string addressStr = entry.address.toString(buf);
-	// 	if (entry.address.ss_family == AF_INET) {
-	// 		m.emplace_back(cbt::SetCell(checkInColumnFamily, ipv4Column, std::move(addressStr)));
-	// 	}
-	// 	else if (entry.address.ss_family == AF_INET6) {
-	// 		m.emplace_back(cbt::SetCell(checkInColumnFamily, ipv6Column, std::move(addressStr)));
-	// 	}
-	// 	int64_t ts = entry.last_seen;
-	// 	m.emplace_back(cbt::SetCell(checkInColumnFamily, lastSeenColumn, std::move(ts)));
-	// 	bulk.emplace_back(m);
-
-	// 	// TODO: Check performance on this.  May need to bach these.
-	// 	_pubsubWriter->publishStatusChange(
-	// 		entry.target, entry.network_id, entry.node_id, entry.os, entry.arch, entry.version, entry.last_seen);
-	// }
-
-	// fprintf(stderr, "Applying %zu mutations to BigTable\n", bulk.size());
-
-	// try {
-	// 	std::vector<cbt::FailedMutation> failures = table.BulkApply(std::move(bulk));
-	// 	fprintf(stderr, "BigTable write completed with %zu failures\n", failures.size());
-	// 	for (auto const& r : failures) {
-	// 		// Handle error (log it, retry, etc.)
-	// 		std::cerr << "Error writing to BigTable: " << r.status() << "\n";
-	// 	}
-	// }
-	// catch (const std::exception& e) {
-	// 	fprintf(stderr, "Exception writing to BigTable: %s\n", e.what());
-	// 	span->SetAttribute("error", e.what());
-	// 	span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
-	// 	return;
-	// }
+	catch (const std::exception& e) {
+		fprintf(stderr, "Exception writing to BigTable: %s\n", e.what());
+		span->SetAttribute("error", e.what());
+		span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
+		return;
+	}
 }
 
 }	// namespace ZeroTier